1 /** 2 Module with queue 3 */ 4 module mutils.container_shared.shared_queue; 5 6 import core.atomic; 7 import std.experimental.allocator; 8 import std.experimental.allocator.building_blocks.bitmapped_block : SharedBitmappedBlock; 9 import std.experimental.allocator.common : platformAlignment; 10 import std.experimental.allocator.mallocator : Mallocator; 11 import std.typecons : Flag, Yes, No; 12 13 import mutils.thread; 14 15 struct MyMallocator { 16 @disable this(this); 17 18 auto make(T, Args...)(auto ref Args args) { 19 return Mallocator.instance.make!T(args); 20 } 21 22 void dispose(T)(ref T* obj) { 23 Mallocator.instance.dispose(obj); 24 } 25 } 26 27 struct LowLockQueue(T, CType = int) { 28 @disable this(this); 29 private: 30 static struct Node { 31 this(T val) { 32 value = val; 33 } 34 35 T value; 36 align(64) Node* next; //atomic 37 }; 38 39 // for one consumer at a time 40 align(64) Node* first; 41 // shared among consumers 42 //MutexSpinLock Mutex consumerLock; 43 MutexSpinLock consumerLock; 44 45 // for one producer at a time 46 align(64) Node* last; 47 // shared among producers 48 MutexSpinLock producerLock; 49 50 51 alias Allocator = SharedBitmappedBlock!(Node.sizeof, platformAlignment, 52 Mallocator, No.multiblock); 53 54 Allocator* allocator; 55 56 public: 57 void initialize() { 58 allocator = new Allocator(1024 * 1024 * 1024); 59 first = allocator.make!(Node)(T.init); 60 last = first; 61 consumerLock.initialzie(); 62 producerLock.initialzie(); 63 } 64 65 void clear() { 66 assert(empty == true); 67 } 68 69 ~this() { 70 clear(); 71 } 72 73 bool empty() { 74 bool isEmpty; 75 //consumerLock.lock(); 76 isEmpty = first.next == null; 77 //consumerLock.unlock(); 78 return isEmpty; 79 } 80 81 void add(T t) { 82 Node* tmp = allocator.make!(Node)(t); 83 84 producerLock.lock(); 85 last.next = tmp; 86 last = tmp; 87 producerLock.unlock(); 88 } 89 90 void add(T[] t) { 91 if (t.length == 0) { 92 return; 93 } 94 Node* firstInChain; 95 Node* lastInChain; 96 Node* tmp = allocator.make!(Node)(t[0]); 97 firstInChain = tmp; 98 lastInChain = tmp; 99 foreach (n; 1 .. t.length) { 100 tmp = allocator.make!(Node)(t[n]); 101 lastInChain.next = tmp; 102 lastInChain = tmp; 103 } 104 105 producerLock.lock(); 106 last.next = firstInChain; 107 last = lastInChain; 108 producerLock.unlock(); 109 110 } 111 112 T pop() { 113 consumerLock.lock(); 114 115 T varInit; 116 Node* theFirst = first; 117 Node* theNext = first.next; 118 if (theNext != null) { 119 T result = theNext.value; 120 theNext.value = varInit; 121 first = theNext; 122 consumerLock.unlock(); 123 124 allocator.dispose(theFirst); 125 return result; 126 } 127 128 consumerLock.unlock(); 129 return varInit; 130 } 131 132 T tryPop() { 133 T varInit; 134 bool locked = consumerLock.tryLock(); 135 if (locked == false) { 136 return varInit; 137 } 138 139 Node* theFirst = first; 140 Node* theNext = first.next; 141 if (theNext != null) { 142 T result = theNext.value; 143 theNext.value = varInit; 144 first = theNext; 145 consumerLock.unlock(); 146 147 allocator.dispose(theFirst); 148 return result; 149 } 150 151 consumerLock.unlock(); 152 return varInit; 153 } 154 }