1 /** 2 Module with queue 3 */ 4 module mutils.container_shared.shared_queue; 5 6 import core.atomic; 7 import std.experimental.allocator; 8 import std.experimental.allocator.mallocator; 9 10 import mutils.container_shared.shared_allocator; 11 //algorithm from http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/DDJ/2008/0811/081001hs01/081001hs01.html 12 //By Herb Sutter 13 14 //Maybe the fastest for not contested resource 15 //lock type is mainly used to distinguish operations in profiler (different function names if not inlined) 16 class LowLockQueue(T,LockType=bool) { 17 private: 18 static struct Node { 19 this( T val ) { 20 value=val; 21 } 22 T value; 23 align (64) Node* next;//atomic 24 }; 25 26 shared uint elementsAdded; 27 shared uint elementsPopped; 28 // for one consumer at a time 29 align (64) Node* first; 30 // shared among consumers 31 align (64) shared LockType consumerLock; 32 33 // for one producer at a time 34 align (64) Node* last; 35 // shared among producers 36 align (64) shared LockType producerLock;//atomic 37 38 39 alias Allocator=BucketAllocator!(Node.sizeof); 40 //alias Allocator=MyMallcoator; 41 //alias Allocator=MyGcAllcoator; 42 Allocator allocator; 43 44 public: 45 this() { 46 allocator=Mallocator.instance.make!Allocator(); 47 first = last = allocator.make!(Node)( T.init ); 48 producerLock = consumerLock = false; 49 } 50 ~this(){ 51 Mallocator.instance.dispose(allocator); 52 } 53 54 55 bool empty(){ 56 return (first.next == null); 57 } 58 59 void add( T t ) { 60 Node* tmp = allocator.make!(Node)( t ); 61 while( !cas(&producerLock,cast(LockType)false,cast(LockType)true )){ } // acquire exclusivity 62 last.next = tmp; // publish to consumers 63 last = tmp; // swing last forward 64 atomicStore(producerLock,false); // release exclusivity 65 atomicOp!"+="(elementsAdded,1); 66 67 } 68 void add( T[] t ) { 69 70 Node* firstInChain; 71 Node* lastInChain; 72 Node* tmp = allocator.make!(Node)( t[0] ); 73 firstInChain=tmp; 74 lastInChain=tmp; 75 foreach(n;1..t.length){ 76 tmp = allocator.make!(Node)( t[n] ); 77 lastInChain.next=tmp; 78 lastInChain=tmp; 79 } 80 while( !cas(&producerLock,cast(LockType)false,cast(LockType)true )){ } // acquire exclusivity 81 last.next = firstInChain; // publish to consumers 82 last = lastInChain; // swing last forward 83 atomicStore(producerLock,cast(LockType)false); // release exclusivity 84 atomicOp!"+="(elementsAdded,t.length); 85 86 } 87 88 89 90 T pop( ) { 91 while( !cas(&consumerLock,cast(LockType)false,cast(LockType)true ) ) { } // acquire exclusivity 92 93 94 Node* theFirst = first; 95 Node* theNext = first.next; 96 if( theNext != null ) { // if queue is nonempty 97 T result = theNext.value; // take it out 98 theNext.value = T.init; // of the Node 99 first = theNext; // swing first forward 100 atomicStore(consumerLock,cast(LockType)false); // release exclusivity 101 atomicOp!"+="(elementsPopped,1); 102 103 allocator.dispose(theFirst); 104 return result; // and report success 105 } 106 107 atomicStore(consumerLock,cast(LockType)false); // release exclusivity 108 return T.init; // report queue was empty 109 } 110 } 111 112 void testLLQ(){ 113 import mutils.job_manager.shared_utils; 114 import std.random:uniform; 115 import std.functional:toDelegate; 116 static int[] tmpArr=[1,1,1,1,1,1]; 117 static shared uint addedElements; 118 __gshared LowLockQueue!int queue; 119 queue=Mallocator.instance.make!(LowLockQueue!int); 120 scope(exit)Mallocator.instance.dispose(queue); 121 122 static void testLLQAdd(){ 123 uint popped; 124 foreach(kk;0..1000){ 125 uint num=uniform(0,1000); 126 atomicOp!"+="(addedElements,num+num*6); 127 foreach(i;0..num)queue.add(1); 128 foreach(i;0..num)queue.add(tmpArr); 129 foreach(i;0..num+num*6){ 130 popped=queue.pop(); 131 assert(popped==1); 132 } 133 } 134 } 135 testMultithreaded((&testLLQAdd).toDelegate,4); 136 assert(queue.elementsAdded==addedElements); 137 assert(queue.elementsAdded==queue.elementsPopped); 138 assert(queue.first.next==null); 139 assert(queue.first==queue.last); 140 }