1 /**
2  Modules contains basic structures for job manager ex. FiberData, Counter. 
3  It also contains structures/functions which extens functionality of job manager like:
4  - UniversalJob - job with parameters and return value
5  - UniversalJobGroup - group of jobs 
6  - multithreaded - makes foreach execute in parallel
7  */
8 module mutils.job_manager.manager_utils;
9 
10 import core.atomic;
11 import core.stdc.stdio;
12 import core.stdc.string : memcpy, memset;
13 import std.algorithm : map;
14 import std.experimental.allocator;
15 import std.experimental.allocator.mallocator;
16 import std.traits : Parameters;
17 import std.stdio;
18 
19 import mutils.container.vector;
20 import mutils.job_manager.manager;
21 import mutils.job_manager.utils;
22 import mutils.thread;
23 import mutils.thread : Fiber;
24 
25 alias jobManagerThreadNum = Thread.getThisThreadNum; //thread local var
26 
27 alias JobDelegate = void delegate();
28 
29 struct FiberData {
30 	Fiber fiber;
31 	uint threadNum;
32 }
33 
34 FiberData getFiberData() {
35 	Fiber fiber = Fiber.getThis();
36 	//printf("getFiberData fiber: %p\n", fiber);
37 	assertKM(fiber !is null);
38 	return FiberData(fiber, jobManagerThreadNum);
39 }
40 
41 struct Counter {
42 	align(64) shared int count;
43 	align(64) FiberData waitingFiber;
44 
45 	this(uint count) {
46 		this.count = count;
47 	}
48 
49 	bool countedToZero() {
50 		return atomicLoad(count) == 0;
51 	}
52 
53 	void decrement() {
54 		assertKM(atomicLoad(count) >= 0);
55 
56 		auto num = atomicOp!"-="(count, 1);
57 		if (num == 0 && waitingFiber.fiber !is null) {
58 			jobManager.addFiber(waitingFiber);
59 		}
60 
61 	}
62 }
63 
64 struct UniversalJob(Delegate) {
65 	alias UnDelegate = UniversalDelegate!Delegate;
66 	UnDelegate unDel; //user function to run, with parameters and return value
67 	JobDelegate runDel; //wraper to decrement counter on end
68 	Counter* counter;
69 	void runWithCounter() {
70 		assertKM(counter !is null);
71 		unDel.callAndSaveReturn();
72 		static if (multithreadedManagerON) {
73 			counter.decrement();
74 		}
75 	}
76 	//had to be allcoated my Mallocator
77 	void runAndDeleteMyself() {
78 		unDel.callAndSaveReturn();
79 		Mallocator.instance.dispose(&this);
80 	}
81 
82 	void initialize(Delegate del, Parameters!(Delegate) args) {
83 		unDel = makeUniversalDelegate!(Delegate)(del, args);
84 		//runDel=&run;
85 	}
86 
87 }
88 // It is faster to add array of jobs
89 struct UniversalJobGroupNew {
90 	enum int fiberWillYieldNum = 100000;
91 	alias Delegate = void delegate();
92 	//string name;
93 	bool runOnJobsDone;
94 	bool spawnOnDependenciesFulfilled = true;
95 	align(64) shared int dependicesWaitCount;
96 	Vector!(UniversalJobGroupNew*) children;
97 
98 	Vector!Job jobs;
99 	Vector!(JobDelegate*) jobPointers;
100 
101 	align(64) shared int countJobsToBeDone;
102 	align(64) FiberData waitingFiber;
103 
104 	static struct Job {
105 		UniversalJobGroupNew* group;
106 		void function(ubyte*) delegateUNB;
107 		Delegate delegateJob; // JobManager takes pointer to delegate, so there will be stored callJob delegate (&callJob)
108 		ubyte[64] memoryForUserDelegate;
109 
110 		void callJob() {
111 			delegateUNB(memoryForUserDelegate.ptr);
112 
113 			auto num = atomicOp!"-="(group.countJobsToBeDone, 1);
114 			if (num == 0) {
115 				group.onJobsCounterZero();
116 			}
117 
118 			// waitForCompletion() adding fiberWillYieldNum says: I will wait to be resumed.  So make sure it will be resumed by true
119 			if (num == fiberWillYieldNum) {
120 				group.onJobsCounterZero(true);
121 			}
122 		}
123 
124 	}
125 
126 	this(int jobsNum) {
127 		jobs.reserve(jobsNum);
128 	}
129 
130 	void dependantOn(UniversalJobGroupNew* parent) {
131 		parent.children ~= &this;
132 		atomicOp!"+="(dependicesWaitCount, 1);
133 	}
134 
135 	// runOnJobsDoneOverride is used when runOnJobsDone is to be set, but due to multithreading it might not be visible immediately
136 	void onJobsCounterZero(bool runOnJobsDoneOverride = false) {
137 		decrementChildrenDependices();
138 
139 		if (runOnJobsDone || runOnJobsDoneOverride) {
140 			jobManager.addFiber(waitingFiber);
141 		}
142 	}
143 
144 	void decrementChildrenDependices() {
145 		foreach (UniversalJobGroupNew* group; children) {
146 			assertKM(atomicLoad(group.dependicesWaitCount) >= 0);
147 
148 			auto num = atomicOp!"-="(group.dependicesWaitCount, 1);
149 			if (num == 0) {
150 				group.onDependicesCounterZero();
151 			}
152 
153 		}
154 	}
155 
156 	void onDependicesCounterZero() {
157 		if (spawnOnDependenciesFulfilled) {
158 			start();
159 		}
160 	}
161 
162 	void start() {
163 		if (jobs.length != 0) {
164 			setUpJobs();
165 			jobManager.addJobs(jobPointers[]);
166 		} else {
167 			// Immediately call group end
168 			onJobsCounterZero();
169 		}
170 	}
171 
172 	auto callAndWait() {
173 		if (jobs.length != 0) {
174 			// Add jobs and wait
175 			runOnJobsDone = true;
176 			waitingFiber = getFiberData();
177 			setUpJobs();
178 			jobManager.addJobsAndYield(jobPointers[]);
179 		} else {
180 			// Immediately call group end
181 			onJobsCounterZero();
182 		}
183 	}
184 
185 
186 	void waitForCompletion() {
187 		if(dependicesWaitCount==0 && countJobsToBeDone==0){
188 			return;
189 		}
190 		waitingFiber = getFiberData();
191 		auto num = atomicOp!"+="(countJobsToBeDone, fiberWillYieldNum);
192 		if (dependicesWaitCount==0 && num == fiberWillYieldNum) {
193 			// Decrementer saw 0 jobs done, it might not resume us if we would yield now
194 			return;
195 		}
196 		runOnJobsDone = true;
197 		Fiber.yield();
198 	}
199 
200 	void add(DG)(DG del, Parameters!(DG) args) {
201 		auto und = UniversalDelegate!(typeof(del))(del, args);
202 		ubyte[] delBytes = und.toBytes();
203 
204 		Job job;
205 		job.group = &this;
206 		job.memoryForUserDelegate[0 .. delBytes.length] = delBytes[0 .. delBytes.length];
207 		job.delegateUNB = &und.callFromBytes;
208 
209 		jobs ~= job;
210 
211 		static assert(und.sizeof <= Job.memoryForUserDelegate.length);
212 	}
213 
214 	auto setUpJobs() {
215 		atomicOp!"+="(countJobsToBeDone, cast(int) jobs.length);
216 		jobPointers.length = jobs.length;
217 		foreach (i, ref jj; jobs) {
218 			jj.delegateJob = &jj.callJob;
219 			jobPointers[i] = &jj.delegateJob;
220 		}
221 	}
222 }
223 
224 //It is faster to add array of jobs
225 struct UniversalJobGroup(Delegate) {
226 	alias DelegateOnEnd = void delegate();
227 	alias UnJob = UniversalJob!(Delegate);
228 	Counter counter;
229 	uint jobsAdded;
230 	UnJob[] unJobs;
231 	JobDelegate*[] dels;
232 
233 	@disable this();
234 	@disable this(this);
235 
236 	this(uint jobsNum) {
237 		mallocatorAllocate(jobsNum);
238 	}
239 
240 	~this() {
241 		mallocatorDeallocate();
242 	}
243 
244 	void add(Delegate del, Parameters!(Delegate) args) {
245 		unJobs[jobsAdded].initialize(del, args);
246 		jobsAdded++;
247 	}
248 
249 	//Returns data like getReturnData
250 	auto callAndWait() {
251 		setUpJobs();
252 		counter.waitingFiber = getFiberData();
253 		jobManager.addJobsAndYield(dels[0 .. jobsAdded]);
254 		static if (UnJob.UnDelegate.hasReturn) {
255 			return getReturnData();
256 		}
257 	}
258 
259 	bool areJobsDone() {
260 		return counter.countedToZero();
261 	}
262 
263 	///Returns range so you can allocate it as you want
264 	///But remember: returned data lives as long as this object
265 	auto getReturnData()() {
266 		static assert(UnJob.UnDelegate.hasReturn);
267 		assertKM(areJobsDone);
268 		return unJobs.map!(a => a.unDel.result);
269 	}
270 
271 	auto start() {
272 		setUpJobs();
273 		jobManager.addJobs(dels[0 .. jobsAdded]);
274 	}
275 
276 private:
277 	auto setUpJobs() {
278 		counter.count = jobsAdded;
279 		foreach (i, ref unJob; unJobs[0 .. jobsAdded]) {
280 			unJob.counter = &counter;
281 			unJob.runDel = &unJob.runWithCounter;
282 			dels[i] = &unJob.runDel;
283 		}
284 	}
285 
286 	void mallocatorAllocate(uint jobsNum) {
287 		unJobs = Mallocator.instance.makeArray!(UnJob)(jobsNum);
288 		dels = Mallocator.instance.makeArray!(JobDelegate*)(jobsNum);
289 	}
290 
291 	void mallocatorDeallocate() {
292 		memset(unJobs.ptr, 0, UnJob.sizeof * unJobs.length);
293 		memset(dels.ptr, 0, (JobDelegate*).sizeof * unJobs.length);
294 		Mallocator.instance.dispose(unJobs);
295 		Mallocator.instance.dispose(dels);
296 	}
297 }
298 
299 auto callAndWait(Delegate)(Delegate del, Parameters!(Delegate) args) {
300 	UniversalJob!(Delegate) unJob;
301 	unJob.initialize(del, args);
302 	unJob.runDel = &unJob.runWithCounter;
303 	Counter counter;
304 	counter.count = 1;
305 	counter.waitingFiber = getFiberData();
306 	unJob.counter = &counter;
307 	jobManager.addJobAndYield(&unJob.runDel);
308 	static if (unJob.unDel.hasReturn) {
309 		return unJob.unDel.result;
310 	}
311 }
312 
313 auto callAndNothing(Delegate)(Delegate del, Parameters!(Delegate) args) {
314 	static assert(!unJob.unDel.hasReturn);
315 	UniversalJob!(Delegate)* unJob = Mallocator.instance.make!(UniversalJob!(Delegate));
316 	unJob.initialize(del, args);
317 	unJob.runDel = &unJob.runAndDeleteMyself;
318 	jobManager.addJob(&unJob.runDel);
319 }
320 
321 auto multithreaded(T)(T[] slice) {
322 
323 	static struct Tmp {
324 		import std.traits : ParameterTypeTuple;
325 
326 		T[] array;
327 		int opApply(Dg)(scope Dg dg) {
328 			static assert(ParameterTypeTuple!Dg.length == 1 || ParameterTypeTuple!Dg.length == 2);
329 			enum hasI = ParameterTypeTuple!Dg.length == 2;
330 			static if (hasI)
331 				alias IType = ParameterTypeTuple!Dg[0];
332 			static struct NoGcDelegateHelper {
333 				Dg del;
334 				T[] arr;
335 				static if (hasI)
336 					IType iStart;
337 
338 				void call() {
339 					foreach (int i, ref element; arr) {
340 						static if (hasI) {
341 							IType iSend = iStart + i;
342 							int result = del(iSend, element);
343 						} else {
344 							int result = del(element);
345 						}
346 						assertKM(result == 0,
347 								"Cant use break, continue, itp in multithreaded foreach");
348 					}
349 				}
350 			}
351 
352 			enum partsNum = 16; //constatnt number == easy usage of stack
353 			if (array.length < partsNum) {
354 				foreach (int i, ref element; array) {
355 					static if (hasI) {
356 						int result = dg(i, element);
357 					} else {
358 						int result = dg(element);
359 					}
360 					assertKM(result == 0, "Cant use break, continue, itp in multithreaded foreach");
361 
362 				}
363 			} else {
364 				NoGcDelegateHelper[partsNum] helpers;
365 				uint step = cast(uint) array.length / partsNum;
366 
367 				alias ddd = void delegate();
368 				UniversalJobGroup!ddd group = UniversalJobGroup!ddd(partsNum);
369 				foreach (int i; 0 .. partsNum - 1) {
370 					helpers[i].del = dg;
371 					helpers[i].arr = array[i * step .. (i + 1) * step];
372 					static if (hasI)
373 						helpers[i].iStart = i * step;
374 					group.add(&helpers[i].call);
375 				}
376 				helpers[partsNum - 1].del = dg;
377 				helpers[partsNum - 1].arr = array[(partsNum - 1) * step .. array.length];
378 				static if (hasI)
379 					helpers[partsNum - 1].iStart = (partsNum - 1) * step;
380 				group.add(&helpers[partsNum - 1].call);
381 
382 				group.callAndWait();
383 			}
384 			return 0;
385 
386 		}
387 	}
388 
389 	Tmp tmp;
390 	tmp.array = slice;
391 	return tmp;
392 }