1 /**
2 Module with queue
3  */
4 module mutils.container_shared.shared_queue;
5 
6 import core.atomic;
7 import std.experimental.allocator;
8 import std.experimental.allocator.building_blocks.bitmapped_block : SharedBitmappedBlock;
9 import std.experimental.allocator.common : platformAlignment;
10 import std.experimental.allocator.mallocator : Mallocator;
11 import std.typecons : Flag, Yes, No;
12 
13 import mutils.thread;
14 
15 struct MyMallocator {
16 	@disable this(this);
17 
18 	auto make(T, Args...)(auto ref Args args) {
19 		return Mallocator.instance.make!T(args);
20 	}
21 
22 	void dispose(T)(ref T* obj) {
23 		Mallocator.instance.dispose(obj);
24 	}
25 }
26 
27 struct LowLockQueue(T, CType = int) {
28 	@disable this(this);
29 private:
30 	static struct Node {
31 		this(T val) {
32 			value = val;
33 		}
34 
35 		T value;
36 		align(64) Node* next; //atomic
37 	};
38 
39 	// for one consumer at a time
40 	align(64) Node* first;
41 	// shared among consumers
42 	//MutexSpinLock  Mutex consumerLock;
43 	MutexSpinLock consumerLock;
44 
45 	// for one producer at a time
46 	align(64) Node* last;
47 	// shared among producers
48 	MutexSpinLock producerLock;
49 
50 
51 	alias Allocator = SharedBitmappedBlock!(Node.sizeof, platformAlignment,
52 			Mallocator, No.multiblock);
53 
54 	Allocator* allocator;
55 
56 public:
57 	void initialize() {
58 		allocator = new Allocator(1024 * 1024 * 1024);
59 		first = allocator.make!(Node)(T.init);
60 		last = first;
61 		consumerLock.initialzie();
62 		producerLock.initialzie();
63 	}
64 
65 	void clear() {
66 		assert(empty == true);
67 	}
68 
69 	~this() {
70 		clear();
71 	}
72 
73 	bool empty() {
74 		bool isEmpty;
75 		//consumerLock.lock();
76 		isEmpty = first.next == null;
77 		//consumerLock.unlock();
78 		return isEmpty;
79 	}
80 
81 	void add(T t) {
82 		Node* tmp = allocator.make!(Node)(t);
83 
84 		producerLock.lock();
85 		last.next = tmp;
86 		last = tmp;
87 		producerLock.unlock();
88 	}
89 
90 	void add(T[] t) {
91 		if (t.length == 0) {
92 			return;
93 		}
94 		Node* firstInChain;
95 		Node* lastInChain;
96 		Node* tmp = allocator.make!(Node)(t[0]);
97 		firstInChain = tmp;
98 		lastInChain = tmp;
99 		foreach (n; 1 .. t.length) {
100 			tmp = allocator.make!(Node)(t[n]);
101 			lastInChain.next = tmp;
102 			lastInChain = tmp;
103 		}
104 
105 		producerLock.lock();
106 		last.next = firstInChain;
107 		last = lastInChain;
108 		producerLock.unlock();
109 
110 	}
111 
112 	T pop() {
113 		consumerLock.lock();
114 
115 		T varInit;
116 		Node* theFirst = first;
117 		Node* theNext = first.next;
118 		if (theNext != null) {
119 			T result = theNext.value;
120 			theNext.value = varInit;
121 			first = theNext;
122 			consumerLock.unlock();
123 
124 			allocator.dispose(theFirst);
125 			return result;
126 		}
127 
128 		consumerLock.unlock();
129 		return varInit;
130 	}
131 
132 	T tryPop() {
133 		T varInit;
134 		bool locked = consumerLock.tryLock();
135 		if (locked == false) {
136 			return varInit;
137 		}
138 
139 		Node* theFirst = first;
140 		Node* theNext = first.next;
141 		if (theNext != null) {
142 			T result = theNext.value;
143 			theNext.value = varInit;
144 			first = theNext;
145 			consumerLock.unlock();
146 
147 			allocator.dispose(theFirst);
148 			return result;
149 		}
150 
151 		consumerLock.unlock();
152 		return varInit;
153 	}
154 }