1 /**
2  Multithreated test may take some space so they are there.
3  */
4 module mutils.job_manager.manager_tests;
5 
6 import core.atomic;
7 import core.simd;
8 import core.stdc.stdio;
9 import std.algorithm : sum;
10 import std.experimental.allocator;
11 import std.experimental.allocator.mallocator;
12 import std.functional : toDelegate;
13 
14 import mutils.job_manager.manager;
15 import mutils.job_manager.utils;
16 import mutils.thread : Fiber, Thread;
17 import mutils.time;
18 
19 import std.stdio;
20 
21 /// One Job and one Fiber.yield
22 void simpleYield() {
23 	auto fiberData = getFiberData();
24 	foreach (i; 0 .. 1) {
25 		jobManager.addThisFiberAndYield(fiberData);
26 	}
27 }
28 
29 void simpleSleep(uint msecs) {
30 	if (msecs == 0) {
31 		return;
32 	}
33 	Thread.sleep(msecs);
34 }
35 
36 void activeSleep(uint u_seconds) {
37 	StopWatch sw;
38 	sw.start();
39 	while (sw.usecs < u_seconds) {
40 		//writeln("kk");
41 	} //for 10us will iterate ~120 tiems
42 	sw.stop();
43 
44 }
45 
46 void makeTestJobsFrom(void function() fn, uint num) {
47 	makeTestJobsFrom(fn.toDelegate, num);
48 }
49 
50 void makeTestJobsFrom(JobDelegate deleg, uint num) {
51 	UniversalJobGroup!JobDelegate group = UniversalJobGroup!JobDelegate(num);
52 	foreach (int i; 0 .. num) {
53 		group.add(deleg);
54 	}
55 	group.callAndWait();
56 }
57 
58 void testFiberLockingToThread() {
59 	auto id = Thread.getThisThreadNum();
60 	auto fiberData = getFiberData();
61 	foreach (i; 0 .. 1000) {
62 		jobManager.addThisFiberAndYield(fiberData);
63 		assertKM(id == Thread.getThisThreadNum());
64 	}
65 }
66 
67 //returns how many jobs it have spawned
68 int randomRecursionJobs(int deepLevel) {
69 	alias UD = UniversalDelegate!(int function(int));
70 	if (deepLevel == 0) {
71 		simpleYield();
72 		return 0;
73 	}
74 	int randNum = 7;
75 
76 	alias ddd = typeof(&randomRecursionJobs);
77 	UniversalJobGroup!ddd group = UniversalJobGroup!ddd(randNum);
78 	foreach (int i; 0 .. randNum) {
79 		group.add(&randomRecursionJobs, deepLevel - 1);
80 	}
81 	auto jobsRun = group.callAndWait();
82 	auto sumNum=sum(jobsRun) + randNum;
83 	return sumNum;
84 }
85 
86 //returns how many jobs it have spawned
87 void testRandomRecursionJobs() {
88 	jobManager.debugHelper.resetCounters();
89 	int jobsRun = callAndWait!(typeof(&randomRecursionJobs))(&randomRecursionJobs, 5);
90 	assertM(jobManager.debugHelper.jobsAdded, jobsRun + 1);
91 	assertM(jobManager.debugHelper.jobsDone,jobsRun + 1);
92 	assertM(jobManager.debugHelper.fibersAdded, jobsRun + 2);
93 	assertM(jobManager.debugHelper.fibersDone, jobsRun + 2);
94 }
95 
96 //__gshared long best=0;
97 void testPerformance() {
98 	uint iterations = 1000;
99 	uint packetSize = 1000;
100 	StopWatch sw;
101 	sw.start();
102 	jobManager.debugHelper.resetCounters();
103 	alias ddd = typeof(&simpleYield);
104 	UniversalJobGroupNew group = UniversalJobGroupNew(packetSize);
105 	foreach (int i; 0 .. packetSize) {
106 		group.add!ddd(&simpleYield);
107 	}
108 	//int[] pp=	new int[100];
109 	foreach (uint i; 0 .. iterations) {
110 		group.callAndWait();
111 	}
112 
113 	assertM(jobManager.debugHelper.jobsAdded, iterations * packetSize);
114 	assertM(jobManager.debugHelper.jobsDone, iterations * packetSize);
115 	assertM(jobManager.debugHelper.fibersAdded, iterations * packetSize + iterations);
116 	assertM(jobManager.debugHelper.fibersDone, iterations * packetSize + iterations);
117 	sw.stop();
118 	long perMs = iterations * packetSize / sw.msecs;
119 	printf("Performacnce performacnce: %dms, perMs: %d\n", cast(int)(sw.msecs), cast(int)(perMs));
120 	//if(perMs>best)best=perMs;
121 
122 }
123 
124 shared int myCounter;
125 void testUnique() {
126 	import mutils.job_manager.debug_sink;
127 
128 	myCounter = 0;
129 	static void localYield() {
130 		auto fiberData = getFiberData();
131 		//DebugSink.add(atomicOp!"+="(myCounter,1));
132 		jobManager.addThisFiberAndYield(fiberData);
133 	}
134 
135 	jobManager.debugHelper.resetCounters();
136 	//DebugSink.reset();
137 	uint packetSize = 1000;
138 
139 	alias ddd = typeof(&localYield);
140 	UniversalJobGroup!ddd group = UniversalJobGroup!ddd(packetSize);
141 	foreach (int i; 0 .. packetSize) {
142 		group.add(&localYield);
143 	}
144 	group.callAndWait();
145 
146 	assertM(jobManager.debugHelper.jobsAdded, packetSize);
147 	assertM(jobManager.debugHelper.jobsDone, packetSize);
148 	assertM(jobManager.debugHelper.fibersAdded, packetSize + 1);
149 	assertM(jobManager.debugHelper.fibersDone, packetSize + 1);
150 	//DebugSink.verifyUnique(packetSize);
151 }
152 
153 void testPerformanceSleep() {
154 	uint partsNum = 1000;
155 	uint iterations = 60;
156 	uint u_secs = 13;
157 
158 	alias ddd = typeof(&activeSleep);
159 	UniversalJobGroupNew group = UniversalJobGroupNew(partsNum);
160 	foreach (int i; 0 .. partsNum) {
161 		group.add!ddd(&activeSleep, u_secs);
162 	}
163 	StopWatch sw;
164 	sw.start();
165 	foreach (i; 0 .. iterations) {
166 		group.callAndWait();
167 	}
168 
169 	sw.stop();
170 	result = cast(float) iterations * u_secs / sw.usecs;
171 
172 }
173 
174 alias mat4 = float[16];
175 void mulMat(mat4[] mA, mat4[] mB, mat4[] mC) {
176 	assertKM(mA.length == mB.length && mB.length == mC.length);
177 	foreach (i; 0 .. mA.length) {
178 		foreach (k; 0 .. 1) {
179 			mC[i] = mB[i][] * mB[i][];
180 		}
181 	}
182 }
183 
184 __gshared float result;
185 __gshared float base = 1;
186 void testPerformanceMatrix() {
187 	import std.parallelism;
188 
189 	uint partsNum = 128;
190 	uint iterations = 100;
191 	uint matricesNum = 512 * 64;
192 	assertKM(matricesNum % partsNum == 0);
193 	mat4[] matricesA = Mallocator.instance.makeArray!mat4(matricesNum);
194 	mat4[] matricesB = Mallocator.instance.makeArray!mat4(matricesNum);
195 	mat4[] matricesC = Mallocator.instance.makeArray!mat4(matricesNum);
196 	scope (exit) {
197 		Mallocator.instance.dispose(matricesA);
198 		Mallocator.instance.dispose(matricesB);
199 		Mallocator.instance.dispose(matricesC);
200 	}
201 	StopWatch sw;
202 	sw.start();
203 	jobManager.debugHelper.resetCounters();
204 	uint step = matricesNum / partsNum;
205 
206 	alias ddd = typeof(&mulMat);
207 	UniversalJobGroupNew group = UniversalJobGroupNew(partsNum);
208 	foreach (int i; 0 .. partsNum) {
209 		group.add!ddd(&mulMat, matricesA[i * step .. (i + 1) * step],
210 				matricesB[i * step .. (i + 1) * step], matricesC[i * step .. (i + 1) * step]);
211 	}
212 	foreach (i; 0 .. iterations) {
213 		group.callAndWait();
214 	}
215 
216 	sw.stop();
217 	result = cast(float) iterations * matricesNum / sw.usecs;
218 	printf("Performacnce matrix: %dms\n", cast(int)(sw.msecs));
219 }
220 
221 void testForeach() {
222 	int[200] ints;
223 	shared uint sum = 0;
224 	foreach (ref int el; ints.multithreaded) {
225 		atomicOp!"+="(sum, 1);
226 		activeSleep(100); //simulate load for 100us
227 	}
228 	foreach (ref int el; ints.multithreaded) {
229 		activeSleep(100);
230 	}
231 	assertKM(sum == 200);
232 }
233 
234 void testGroupStart() {
235 	if (jobManager.threadsCount == 1) {
236 		return;
237 	}
238 	uint partsNum = 100;
239 
240 	alias ddd = typeof(&activeSleep);
241 	UniversalJobGroup!ddd group = UniversalJobGroup!ddd(partsNum);
242 	foreach (int i; 0 .. partsNum) {
243 		group.add(&activeSleep, 10);
244 	}
245 	group.start();
246 	activeSleep(10);
247 	assertKM(group.counter.count > 0 && !group.counter.countedToZero());
248 	Thread.sleep(1000);
249 	//activeSleep(1000_000);
250 	assertKM(group.areJobsDone);
251 
252 }
253 
254 void testGroupsDependicesFuncA(int[] arr) {
255 	foreach (ref el; arr) {
256 		el = 1;
257 	}
258 }
259 
260 void testGroupsDependicesFuncB(int[] arr) {
261 	foreach (ref el; arr) {
262 		el = 1_000_000;
263 	}
264 }
265 
266 void testGroupsDependicesFuncC(int[] arr) {
267 	foreach (ref el; arr) {
268 		el *= 201;
269 	}
270 }
271 
272 void testGroupsDependicesFuncD(int[] arr) {
273 	foreach (ref el; arr) {
274 		el -= 90;
275 	}
276 }
277 
278 void testGroupsDependicesFuncE(int[] arrA, int[] arrB) {
279 	assertKM(arrA.length == arrB.length);
280 	foreach (i; 0 .. arrA.length) {
281 		arrA[i] += arrB[i];
282 		arrA[i] *= -1;
283 	}
284 }
285 
286 void testGroupsDependices() {
287 
288 	enum uint partsNum = 512;
289 	enum uint elementsNum = 512 * 512;
290 	enum uint step = elementsNum / partsNum;
291 	int[] elements = Mallocator.instance.makeArray!int(elementsNum);
292 	int[] elements2 = Mallocator.instance.makeArray!int(elementsNum);
293 	scope (exit) {
294 		Mallocator.instance.dispose(elements);
295 		Mallocator.instance.dispose(elements2);
296 	}
297 	alias ddd = void function(int[]);
298 	alias dddSum = void function(int[], int[]);
299 
300 	auto groupA = UniversalJobGroupNew();
301 	auto groupB = UniversalJobGroupNew();
302 	auto groupC = UniversalJobGroupNew();
303 	auto groupD = UniversalJobGroupNew();
304 	auto groupE = UniversalJobGroupNew();
305 	/*groupA.name="groupA";
306 	groupB.name="groupB";
307 	groupC.name="groupC";
308 	groupD.name="groupD";
309 	groupE.name="groupE";*/
310 
311 	foreach (int i; 0 .. partsNum) {
312 		groupA.add!ddd(&testGroupsDependicesFuncA, elements[i * step .. (i + 1) * step]);
313 		groupB.add!ddd(&testGroupsDependicesFuncB, elements2[i * step .. (i + 1) * step]);
314 		groupC.add!ddd(&testGroupsDependicesFuncC, elements[i * step .. (i + 1) * step]);
315 		groupD.add!ddd(&testGroupsDependicesFuncD, elements[i * step .. (i + 1) * step]);
316 		groupE.add!dddSum(&testGroupsDependicesFuncE,
317 				elements[i * step .. (i + 1) * step], elements2[i * step .. (i + 1) * step]);
318 	}
319 
320 	//---------- groupKK ------------
321 	//groupA -> groupC -> groupD -> |
322 	//groupB ->                     | groupE
323 
324 	//groupE.runOnJobsDone = true;
325 	groupE.dependantOn(&groupD);
326 	groupE.dependantOn(&groupB);
327 
328 	groupC.dependantOn(&groupA);
329 	groupD.dependantOn(&groupC);
330 
331 	groupA.start();
332 	groupB.start();
333 
334 	groupE.waitForCompletion();
335 
336 	foreach (el; elements) {
337 		assertM(el, -1_000_111);
338 	}
339 }
340 
341 void testUnbalancedGroups() {
342 
343 	enum uint groupsNum = 32;
344 	alias ddd = void function(uint);
345 
346 	auto groupEnd = UniversalJobGroupNew();
347 	UniversalJobGroupNew[groupsNum] groups;
348 
349 	foreach (ref group; groups) {
350 		group.add!ddd(&simpleSleep, 100);
351 		foreach (i; 0 .. 32) {
352 			group.add!ddd(&simpleSleep, 0);
353 		}
354 		groupEnd.dependantOn(&group);
355 	}
356 
357 	StopWatch sw;
358 	sw.start();
359 	foreach (ref group; groups) {
360 		group.start();
361 	}
362 
363 	groupEnd.waitForCompletion();
364 
365 	sw.stop();
366 	printf("Performacnce unbalanced: %dms\n", cast(int)(sw.msecs));
367 }
368 
369 void testWaitForGroup() {
370 	alias ddd = void function(uint);
371 
372 	UniversalJobGroupNew groupA;
373 	UniversalJobGroupNew groupEnd;
374 	/*groupA.name="groupA";
375 	groupEnd.name="groupEnd";*/
376 	groupEnd.dependantOn(&groupA);
377 	groupA.add!ddd(&simpleSleep, 0);
378 	groupA.start();
379 	//Thread.sleep(100);
380 	groupA.waitForCompletion();
381 	//Thread.sleep(100);
382 	//groupEnd.waitForCompletion();
383 }
384 
385 
386 void test(uint threadsNum = 16) {
387 	import core.memory;
388 
389 	GC.disable();
390 	static void startTest() {
391 		foreach (i; 0 .. 1) {
392 			alias UnDel = void delegate();
393 			testForeach();
394 			makeTestJobsFrom(&testFiberLockingToThread, 100);
395 			callAndWait!(UnDel)((&testUnique).toDelegate);
396 			callAndWait!(UnDel)((&testGroupsDependices).toDelegate);
397 			callAndWait!(UnDel)((&testWaitForGroup).toDelegate);
398 			callAndWait!(UnDel)((&testPerformance).toDelegate);	
399 			callAndWait!(UnDel)((&testPerformanceMatrix).toDelegate);
400 			callAndWait!(UnDel)((&testPerformanceSleep).toDelegate);
401 			callAndWait!(UnDel)((&testUnbalancedGroups).toDelegate);
402 			//callAndWait!(UnDel)((&testGroupStart).toDelegate);// Has to have long sleep
403 			callAndWait!(UnDel)((&testRandomRecursionJobs).toDelegate);
404 		}
405 		//writeln(best);
406 
407 	}
408 
409 	jobManager.startMainLoop(&startTest, threadsNum);
410 	jobManager.clear();
411 }
412 
413 void testScalability() {
414 	foreach (int i; 1 .. 44) {
415 		printf(" %d ", i);
416 		test(i);
417 	}
418 }
419 
420 unittest {
421 	//test(4);
422 }