1 /**
2 Module implements multithreaded job manager with fibers (coroutines).
3 Thanks to fibers any task can be stopped in the middle of execution and started again by this manager.
4 Fibers are bound to one thread due to TLS issues and performance reasons.
5  */
6 
7 module mutils.job_manager.manager_multithreaded;
8 //module mutils.job_manager.manager_multithreadeddd; version(none):
9 
10 import core.atomic;
11 import core.stdc.stdio;
12 import core.stdc.stdlib : rand;
13 import std.functional : toDelegate;
14 import std.stdio;
15 
16 import mutils.container.vector;
17 import mutils.container_shared.shared_queue;
18 import mutils.job_manager.fiber_cache;
19 import mutils.job_manager.manager_utils;
20 import mutils.job_manager.utils;
21 import mutils.thread : Fiber, Thread, Semaphore, instructionPause;
22 
23 enum threadsPerCPU = 4;
24 
25 alias JobVector = LowLockQueue!(JobDelegate*);
26 alias FiberVector = LowLockQueue!(FiberData);
27 
28 __gshared JobManager jobManager;
29 
30 struct JobManager {
31 	struct DebugHelper {
32 		align(64) shared uint jobsAdded;
33 		align(64) shared uint jobsDone;
34 		align(64) shared uint fibersAdded;
35 		align(64) shared uint fibersDone;
36 
37 		void resetCounters() {
38 			{
39 				atomicStore(jobsAdded, 0);
40 				atomicStore(jobsDone, 0);
41 				atomicStore(fibersAdded, 0);
42 				atomicStore(fibersDone, 0);
43 			}
44 		}
45 
46 		void jobsAddedAdd(int num = 1) {
47 			debug atomicOp!"+="(jobsAdded, num);
48 		}
49 
50 		void jobsDoneAdd(int num = 1) {
51 			debug atomicOp!"+="(jobsDone, num);
52 		}
53 
54 		void fibersAddedAdd(int num = 1) {
55 			debug atomicOp!"+="(fibersAdded, num);
56 		}
57 
58 		void fibersDoneAdd(int num = 1) {
59 			debug atomicOp!"+="(fibersDone, num);
60 		}
61 
62 	}
63 
64 	int threadsCount;
65 	DebugHelper debugHelper;
66 	// Jobs managment
67 	private int addJobToQueueNum;
68 	private Vector!JobVector waitingJobs;
69 	// Fibers managment
70 	private Vector!FiberTLSCache fibersCache;
71 	private Vector!FiberVector waitingFibers;
72 	// Thread managment
73 	private Vector!Thread threadPool;
74 	private Vector!Semaphore semaphores;
75 	private bool exit;
76 
77 	private void initialize(uint threadsCount = 0) {
78 		exit = false;
79 
80 		if (threadsCount == 0)
81 			threadsCount = threadsPerCPU;
82 		if (threadsCount == 0)
83 			threadsCount = 4;
84 
85 		this.threadsCount = threadsCount;
86 
87 		waitingFibers.length = threadsCount;
88 		threadPool.length = threadsCount;
89 		waitingJobs.length = threadsCount;
90 		semaphores.length = threadsCount;
91 		fibersCache.length = threadsCount;
92 
93 		foreach (uint i; 0 .. threadsCount) {
94 			waitingFibers[i].initialize();
95 			semaphores[i].initialize();
96 			threadPool[i].threadNum = i;
97 			threadPool[i].setDg(&threadRunFunction);
98 			waitingJobs[i].initialize();
99 		}
100 
101 		version (Android)
102 			rt_init();
103 	}
104 
105 	void clear() {
106 		foreach (i; 0 .. threadsCount) {
107 			waitingJobs[i].clear();
108 			waitingFibers[i].clear();
109 			fibersCache[i].clear();
110 			semaphores[i].destroy();
111 		}
112 		waitingFibers.clear();
113 		waitingJobs.clear();
114 		threadPool.clear();
115 		fibersCache.clear();
116 		semaphores.clear();
117 	}
118 
119 	void start() {
120 		foreach (ref thread; threadPool) {
121 			thread.start();
122 		}
123 	}
124 
125 	void startMainLoop(void function() mainLoop, uint threadsCount = 0) {
126 		startMainLoop(mainLoop.toDelegate, threadsCount);
127 	}
128 
129 	void startMainLoop(JobDelegate mainLoop, uint threadsCount = 0) {
130 
131 		align(64) shared bool endLoop = false;
132 		static struct NoGcDelegateHelper {
133 			JobDelegate del;
134 			shared bool* endPointer;
135 
136 			this(JobDelegate del, ref shared bool end) {
137 				this.del = del;
138 				endPointer = &end;
139 			}
140 
141 			void call() {
142 				del();
143 				atomicStore(*endPointer, true);
144 			}
145 		}
146 
147 		NoGcDelegateHelper helper = NoGcDelegateHelper(mainLoop, endLoop);
148 		initialize(threadsCount);
149 		auto del = &helper.call;
150 		start();
151 		addJob(&del);
152 		waitForEnd(endLoop);
153 		end();
154 	}
155 
156 	void waitForEnd(ref shared bool end) {
157 		bool wait = true;
158 		do {
159 			wait = !atomicLoad(end);
160 			foreach (ref th; threadPool) {
161 				if (!th.isRunning) {
162 					wait = false;
163 				}
164 			}
165 			Thread.sleep(10);
166 		}
167 		while (wait);
168 	}
169 
170 	void end() {
171 		exit = true;
172 		foreach (i; 0 .. threadsCount) {
173 			semaphores[i].post();
174 		}
175 		foreach (ref thread; threadPool) {
176 			thread.join();
177 		}
178 		version (Android)
179 			rt_close();
180 
181 	}
182 
183 	void addFiber(FiberData fiberData) {
184 		while(fiberData.fiber.state==Fiber.State.EXEC){
185 			instructionPause();
186 		}
187 		assertKM(waitingFibers.length == threadPool.length);
188 		assertKM(fiberData.fiber.state != Fiber.State.TERM && fiberData.fiber.state!=Fiber.State.EXEC); //   - cannot be added because addThisFiberAndYield violates this assertion
189 		debugHelper.fibersAddedAdd();
190 		waitingFibers[fiberData.threadNum].add(fiberData);
191 		semaphores[fiberData.threadNum].post();
192 	}
193 
194 	// Only for tests - it is pointless to add itself to be immediately resumed
195 	void addThisFiberAndYield(FiberData thisFiber) {
196 		//writeln("addThisFiberAndYield");
197 		debugHelper.fibersAddedAdd();
198 		waitingFibers[thisFiber.threadNum].add(thisFiber);
199 		semaphores[thisFiber.threadNum].post();
200 		Fiber.yield();
201 	}
202 
203 	void addJob(JobDelegate* del) {
204 		debugHelper.jobsAddedAdd();
205 
206 		int queueNum = addJobToQueueNum % threadsCount;
207 		waitingJobs[queueNum].add(del);
208 		semaphores[queueNum].post();
209 		addJobToQueueNum++;
210 	}
211 
212 	void addJobs(JobDelegate*[] dels) {
213 		debugHelper.jobsAddedAdd(cast(int) dels.length);
214 
215 		int part = cast(int)dels.length / threadsCount;
216 		if (part > 0) {
217 			foreach (i, ref wj; waitingJobs) {
218 				wj.add(dels[i * part .. (i + 1) * part]);
219 
220 				foreach (kkk; 0 .. part) {
221 					semaphores[i].post();
222 				}
223 			}
224 			dels = dels[part * waitingJobs.length .. $];
225 		}
226 		foreach (del; dels) {
227 			int queueNum = addJobToQueueNum % threadsCount;
228 			waitingJobs[queueNum].add(del);
229 			semaphores[queueNum].post();
230 			addJobToQueueNum++;
231 		}
232 	}
233 
234 	void addJobAndYield(JobDelegate* del) {
235 		addJob(del);
236 		Fiber.yield();
237 	}
238 
239 	void addJobsAndYield(JobDelegate*[] dels) {
240 		addJobs(dels);
241 		Fiber.yield();
242 	}
243 
244 	Fiber allocateFiber(JobDelegate del, int threadNum) {
245 		Fiber fiber = fibersCache[threadNum].getData();
246 		assertKM(fiber.state == Fiber.State.TERM);
247 		assertKM(fiber.myThreadNum == jobManagerThreadNum);
248 		fiber.reset(del);
249 		return fiber;
250 	}
251 
252 	void deallocateFiber(Fiber fiber, int threadNum) {
253 		fiber.threadStart = null;
254 		fibersCache[threadNum].removeData(fiber);
255 	}
256 
257 	Fiber getFiberOwnerThread(int threadNum) {
258 		Fiber fiber;
259 		FiberData fd = waitingFibers[threadNum].pop;
260 		if (fd != FiberData.init) {
261 			fiber = fd.fiber;
262 			debugHelper.fibersDoneAdd();
263 		} else {
264 			JobDelegate* job = waitingJobs[threadNum].pop();
265 			if (job !is null) {
266 				debugHelper.jobsDoneAdd();
267 				fiber = allocateFiber(*job, threadNum);
268 			}
269 		}
270 		return fiber;
271 	}
272 
273 	Fiber getFiberThiefThread(int threadNum) {
274 		Fiber fiber;
275 		foreach (thSteal; 0 .. threadsCount) {
276 			if (thSteal == threadNum) {
277 				continue; // Do not steal from yourself
278 			}
279 			if (!semaphores[thSteal].tryWait()) {
280 				continue;
281 			}
282 			JobDelegate* job = waitingJobs[thSteal].pop();
283 			if (job !is null) {
284 				debugHelper.jobsDoneAdd();
285 				fiber = allocateFiber(*job, threadNum);
286 				break;
287 			} else {
288 				semaphores[thSteal].post(); // Can not steal, give owner a chance to take it
289 			}
290 
291 			
292 		}
293 		return fiber;
294 	}
295 
296 	void threadRunFunction() {
297 		Fiber.initializeStatic();
298 		int threadNum = jobManagerThreadNum;
299 		while (!exit) {
300 			Fiber fiber;
301 			if (semaphores[threadNum].tryWait()) {
302 				fiber = getFiberOwnerThread(threadNum);
303 			} else {
304 				fiber = getFiberThiefThread(threadNum);
305 				if (fiber is null) {
306 					// Thread does not have own job and can not steal it, so wait for a job 
307 					// semaphores[threadNum].wait(); There is deadlock in some cases. For now use timedWait instaed of wait
308 					bool ok=semaphores[threadNum].timedWait(1_000_000);
309 					
310 					if(ok==false){
311 						//writeln("Semaphore 1s timed out...");
312 					}
313 					fiber = getFiberOwnerThread(threadNum);
314 				}
315 			}
316 
317 			// Nothing to do
318 			if (fiber is null) {
319 				continue;
320 			}
321 			// Do the job
322 			assertKM(fiber.state == Fiber.State.HOLD);
323 			fiber.call();
324 
325 			// Reuse fiber
326 			if (fiber.state == Fiber.State.TERM) {
327 				deallocateFiber(fiber, threadNum);
328 			}
329 		}
330 	}
331 
332 }