1 /**
2 Module with queue
3  */
4 module mutils.container_shared.shared_queue;
5 
6 import core.atomic;
7 import std.experimental.allocator;
8 import std.experimental.allocator.mallocator;
9 
10 import mutils.container_shared.shared_allocator;
11 //algorithm from  http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/DDJ/2008/0811/081001hs01/081001hs01.html
12 //By Herb Sutter
13 
14 //Maybe the fastest for not contested resource
15 //lock type is mainly used to distinguish operations in profiler (different function names if not inlined)
16 class LowLockQueue(T,LockType=bool) {
17 private:
18 	static struct Node {
19 		this( T val )  {			
20 			value=val;
21 		}
22 		T value;
23 		align (64)  Node* next;//atomic
24 	};
25 	
26 	shared uint elementsAdded;
27 	shared uint elementsPopped;
28 	// for one consumer at a time
29 	align (64)  Node* first;
30 	// shared among consumers
31 	align (64) shared LockType consumerLock;
32 	
33 	// for one producer at a time
34 	align (64)  Node* last; 
35 	// shared among producers
36 	align (64) shared LockType producerLock;//atomic
37 	
38 	
39 	alias Allocator=BucketAllocator!(Node.sizeof);
40 	//alias Allocator=MyMallcoator;
41 	//alias Allocator=MyGcAllcoator;
42 	Allocator allocator;
43 	
44 public:
45 	this() {
46 		allocator=Mallocator.instance.make!Allocator();
47 		first = last =  allocator.make!(Node)( T.init );		
48 		producerLock = consumerLock = false;
49 	}
50 	~this(){
51 		Mallocator.instance.dispose(allocator);
52 	}
53 	
54 	
55 	bool empty(){
56 		return (first.next == null); 
57 	}
58 
59 	void add( T  t ) {
60 		Node* tmp = allocator.make!(Node)( t );
61 		while( !cas(&producerLock,cast(LockType)false,cast(LockType)true )){ } 	// acquire exclusivity
62 		last.next = tmp;		 		// publish to consumers
63 		last = tmp;		 		// swing last forward
64 		atomicStore(producerLock,false);		// release exclusivity
65 		atomicOp!"+="(elementsAdded,1);
66 		
67 	}
68 	void add( T[]  t ) {
69 		
70 		Node* firstInChain;
71 		Node* lastInChain;
72 		Node* tmp = allocator.make!(Node)( t[0] );
73 		firstInChain=tmp;
74 		lastInChain=tmp;
75 		foreach(n;1..t.length){
76 			tmp = allocator.make!(Node)( t[n] );
77 			lastInChain.next=tmp;
78 			lastInChain=tmp;
79 		}
80 		while( !cas(&producerLock,cast(LockType)false,cast(LockType)true )){ } 	// acquire exclusivity
81 		last.next = firstInChain;		 		// publish to consumers
82 		last = lastInChain;		 		// swing last forward
83 		atomicStore(producerLock,cast(LockType)false);		// release exclusivity
84 		atomicOp!"+="(elementsAdded,t.length);
85 		
86 	}
87 	
88 	
89 	
90 	T pop(  ) {
91 		while( !cas(&consumerLock,cast(LockType)false,cast(LockType)true ) ) { }	 // acquire exclusivity
92 		
93 		
94 		Node* theFirst = first;
95 		Node* theNext = first.next;
96 		if( theNext != null ) { // if queue is nonempty
97 			T result = theNext.value;	 	       	// take it out
98 			theNext.value = T.init; 	       	// of the Node
99 			first = theNext;		 	       	// swing first forward
100 			atomicStore(consumerLock,cast(LockType)false);	       	// release exclusivity		
101 			atomicOp!"+="(elementsPopped,1);
102 			
103 			allocator.dispose(theFirst);
104 			return result;	 		// and report success
105 		}
106 		
107 		atomicStore(consumerLock,cast(LockType)false);       	// release exclusivity
108 		return T.init; 	// report queue was empty
109 	}
110 }
111 
112 void testLLQ(){
113 	import mutils.job_manager.shared_utils;
114 	import std.random:uniform;
115 	import std.functional:toDelegate;
116 	static int[] tmpArr=[1,1,1,1,1,1];
117 	static shared uint addedElements;
118 	__gshared LowLockQueue!int queue;
119 	queue=Mallocator.instance.make!(LowLockQueue!int);
120 	scope(exit)Mallocator.instance.dispose(queue);
121 	
122 	static void testLLQAdd(){
123 		uint popped;
124 		foreach(kk;0..1000){
125 			uint num=uniform(0,1000);
126 			atomicOp!"+="(addedElements,num+num*6);
127 			foreach(i;0..num)queue.add(1);
128 			foreach(i;0..num)queue.add(tmpArr);
129 			foreach(i;0..num+num*6){
130 				popped=queue.pop();
131 				assert(popped==1);
132 			}
133 		}
134 	}
135 	testMultithreaded((&testLLQAdd).toDelegate,4);
136 	assert(queue.elementsAdded==addedElements);
137 	assert(queue.elementsAdded==queue.elementsPopped);
138 	assert(queue.first.next==null);
139 	assert(queue.first==queue.last);
140 }