1 /**
2  Multithreated test may take some space so they are there.
3  */
4 module mutils.job_manager.manager_tests;
5 
6 import core.atomic;
7 import core.memory;
8 import core.simd;
9 import core.thread : Thread,ThreadID,Fiber;
10 
11 import std.algorithm : sum;
12 import std.functional : toDelegate;
13 import std.random : uniform;
14 import std.stdio : write,writeln,writefln;
15 
16 import mutils.benchmark;
17 import mutils.job_manager.debug_sink;
18 import mutils.job_manager.manager;
19 import mutils.job_manager.shared_utils;
20 import mutils.job_manager.utils;
21 
22 
23 /// One Job and one Fiber.yield
24 void simpleYield(){
25 	auto fiberData=getFiberData();
26 	foreach(i;0..1){
27 		jobManager.addThisFiberAndYield(fiberData);
28 	}
29 }
30 
31 
32 void activeSleep(uint u_seconds){
33 	StopWatch sw;
34 	sw.start();
35 	while(sw.usecs<u_seconds){}//for 10us will iterate ~120 tiems
36 	sw.stop();
37 	
38 }
39 
40 void veryDummy(){
41 	//foreach(i;0..10)writeln("asdasd");
42 }
43 
44 void makeTestJobsFrom(void function() fn,uint num){
45 	makeTestJobsFrom(fn.toDelegate,num);
46 }
47 
48 void makeTestJobsFrom(JobDelegate deleg,uint num){
49 	UniversalJobGroup!JobDelegate group=UniversalJobGroup!JobDelegate(num);
50 	foreach(int i;0..num){
51 		group.add(deleg);
52 	}
53 	group.callAndWait();
54 }
55 
56 void testFiberLockingToThread(){
57 	ThreadID id=Thread.getThis.id;
58 	auto fiberData=getFiberData();
59 	foreach(i;0..1000){
60 		jobManager.addThisFiberAndYield(fiberData);
61 		assert(id==Thread.getThis.id);
62 	}
63 }
64 
65 //returns how many jobs it have spawned
66 int randomRecursionJobs(int deepLevel){
67 	alias UD=UniversalDelegate!(int function(int));
68 	if(deepLevel==0){
69 		simpleYield();
70 		return 0;
71 	}
72 	int randNum=uniform(1,10);
73 	//randNum=10;
74 	
75 	alias ddd=typeof(&randomRecursionJobs);
76 	UniversalJobGroup!ddd group=UniversalJobGroup!ddd(randNum);
77 	foreach(int i;0..randNum){
78 		group.add(&randomRecursionJobs,deepLevel-1);
79 	}
80 	auto jobsRun=group.callAndWait();
81 	return sum(jobsRun)+randNum;
82 }
83 
84 //returns how many jobs it have spawned
85 void testRandomRecursionJobs(){
86 	jobManager.debugHelper.resetCounters();
87 	int jobsRun=callAndWait!(typeof(&randomRecursionJobs))(&randomRecursionJobs,5);
88 	assert(jobManager.debugHelper.jobsAdded==jobsRun+1);
89 	assert(jobManager.debugHelper.jobsDone==jobsRun+1);
90 	assert(jobManager.debugHelper.fibersAdded==jobsRun+2);
91 	assert(jobManager.debugHelper.fibersDone==jobsRun+2);
92 }
93 
94 
95 void testPerformance(){	
96 	uint iterations=1000;
97 	uint packetSize=100;
98 	StopWatch sw;
99 	sw.start();
100 	jobManager.debugHelper.resetCounters();
101 	alias ddd=typeof(&simpleYield);
102 	UniversalJobGroup!ddd group=UniversalJobGroup!ddd(packetSize);
103 	foreach(int i;0..packetSize){
104 		group.add(&simpleYield);
105 	}
106 	//int[] pp=	new int[100];
107 	foreach(i;0..iterations){
108 		group.callAndWait();
109 	}
110 
111 	
112 	assertM(jobManager.debugHelper.jobsAdded,iterations*packetSize);
113 	assertM(jobManager.debugHelper.jobsDone ,iterations*packetSize);
114 	assertM(jobManager.debugHelper.fibersAdded,iterations*packetSize+iterations);
115 	assertM(jobManager.debugHelper.fibersDone ,iterations*packetSize+iterations);
116 	sw.stop();  
117 	writefln( "Benchmark: %s*%s : %s[ms], %s[it/ms]",iterations,packetSize,sw.msecs,iterations*packetSize/sw.msecs);
118 }
119 
120 void testUnique(){	
121 	shared int myCounter;
122 	void localYield(){
123 		auto fiberData=getFiberData();
124 		DebugSink.add(atomicOp!"+="(myCounter,1));
125 		jobManager.addThisFiberAndYield(fiberData);
126 	}
127 	jobManager.debugHelper.resetCounters();
128 	DebugSink.reset();	
129 	uint packetSize=1000;
130 
131 	alias ddd=typeof(&localYield);
132 	UniversalJobGroup!ddd group=UniversalJobGroup!ddd(packetSize);
133 	foreach(int i;0..packetSize){
134 		group.add(&localYield);
135 	}
136 	group.callAndWait();
137 
138 	assertM(jobManager.debugHelper.jobsAdded,packetSize);
139 	assertM(jobManager.debugHelper.jobsDone ,packetSize);
140 	assertM(jobManager.debugHelper.fibersAdded,packetSize+1);
141 	assertM(jobManager.debugHelper.fibersDone ,packetSize+1);	
142 	DebugSink.verifyUnique(packetSize);
143 }
144 
145 
146 void testPerformanceSleep(){	
147 	uint partsNum=1000;
148 	uint iterations=60;
149 	uint u_secs=13;
150 	
151 	
152 	alias ddd=typeof(&activeSleep);
153 	UniversalJobGroup!ddd group=UniversalJobGroup!ddd(partsNum);
154 	foreach(int i;0..partsNum){
155 		group.add(&activeSleep,u_secs);
156 	}
157 	StopWatch sw;
158 	sw.start();
159 	foreach(i;0..iterations){
160 		group.callAndWait();
161 	}
162 	
163 	
164 	sw.stop();  
165 	result=cast(float)iterations*u_secs/sw.usecs;
166 	
167 	writefln( "BenchmarkSleep: %s*%s : %s[us], %s[it/us]  %s",iterations,u_secs,sw.usecs,cast(float)iterations*u_secs/sw.usecs,result/base);
168 }
169 
170 
171 
172 
173 
174 alias mat4=float[16];
175 //import gl3n.linalg;
176 void mulMat(mat4[] mA,mat4[] mB,mat4[] mC){
177 	assert(mA.length==mB.length && mB.length==mC.length);
178 	foreach(i;0..mA.length){
179 		foreach(k;0..1){
180 			mC[i]=mB[i][]*mB[i][];
181 		}
182 	}
183 }
184 
185 __gshared float result;
186 __gshared float base=1;
187 void testPerformanceMatrix(){	
188 	import std.parallelism;
189 	uint partsNum=16;
190 	uint iterations=100;	
191 	uint matricesNum=512;
192 	assert(matricesNum%partsNum==0);
193 	mat4[] matricesA=Mallocator.instance.makeArray!mat4(matricesNum);
194 	mat4[] matricesB=Mallocator.instance.makeArray!mat4(matricesNum);
195 	mat4[] matricesC=Mallocator.instance.makeArray!mat4(matricesNum);
196 	scope(exit){
197 		Mallocator.instance.dispose(matricesA);
198 		Mallocator.instance.dispose(matricesB);
199 		Mallocator.instance.dispose(matricesC);
200 	}
201 	StopWatch sw;
202 	sw.start();
203 	jobManager.debugHelper.resetCounters();
204 	uint step=matricesNum/partsNum;
205 	
206 	alias ddd=typeof(&mulMat);
207 	UniversalJobGroup!ddd group=UniversalJobGroup!ddd(partsNum);
208 	foreach(int i;0..partsNum){
209 		group.add(&mulMat,matricesA[i*step..(i+1)*step],matricesB[i*step..(i+1)*step],matricesC[i*step..(i+1)*step]);
210 	}
211 	foreach(i;0..iterations){
212 		group.callAndWait();
213 	}
214 	
215 	
216 	sw.stop();  
217 	result=cast(float)iterations*matricesNum/sw.usecs;
218 	writefln( "BenchmarkMatrix: %s*%s : %s[us], %s[it/us] %s",iterations,matricesNum,sw.usecs,cast(float)iterations*matricesNum/sw.usecs,result/base);
219 }
220 
221 void testForeach(){
222 	int[200] ints;
223 	shared uint sum=0;
224 	foreach(ref int el;ints.multithreated){
225 		atomicOp!"+="(sum,1);
226 		activeSleep(100);//simulate load for 100us
227 	}
228 	foreach(ref int el;ints.multithreated){
229 		activeSleep(100);
230 	}
231 	assert(sum==200);
232 }
233 
234 void testGroupStart(){
235 	if(jobManager.threadsNum==1){
236 		writeln("Can not have background job while there is only one thread in threadPool.");
237 		return;
238 	}
239 	uint partsNum=100;
240 	
241 	alias ddd=typeof(&activeSleep);
242 	UniversalJobGroup!ddd group=UniversalJobGroup!ddd(partsNum);
243 	foreach(int i;0..partsNum){
244 		group.add(&activeSleep,10);
245 	}
246 	group.start();
247 	activeSleep(10);
248 	assert(group.counter.count>0 && !group.counter.countedToZero());
249 	activeSleep(10000);
250 	assert(group.areJobsDone);
251 
252 }
253 
254 void test(uint threadsNum=16){
255 	
256 	static void startTest(){
257 		foreach(i;0..1){
258 			alias UnDel=void delegate();
259 			testForeach();
260 			makeTestJobsFrom(&testFiberLockingToThread,100);
261 			callAndWait!(UnDel)((&testUnique).toDelegate);
262 			callAndWait!(UnDel)((&testPerformance).toDelegate);
263 			callAndWait!(UnDel)((&testPerformanceMatrix).toDelegate);
264 			callAndWait!(UnDel)((&testPerformanceSleep).toDelegate);
265 			callAndWait!(UnDel)((&testGroupStart).toDelegate);
266 			callAndWait!(UnDel)((&testRandomRecursionJobs).toDelegate);
267 			//int[] pp=	new int[1000];//Make some garbage so GC would trigger
268 		}
269 
270 	}
271 	jobManager.startMainLoop(&startTest,threadsNum);
272 }
273 void testScalability(){
274 	//foreach(i;3..4){
275 	//	jobManager=Mallocator.instance.make!JobManager;
276 	//	scope(exit)Mallocator.instance.dispose(jobManager);
277 	//	write(i+1," ");
278 	test(4);
279 	//if(i==0)base=result;
280 	//}
281 }
282 
283 
284 unittest{
285 	//test();
286 }