1 /** 2 Multithreated test may take some space so they are there. 3 */ 4 module mutils.job_manager.manager_tests; 5 6 import core.atomic; 7 import core.memory; 8 import core.simd; 9 import core.thread : Thread,ThreadID,Fiber; 10 11 import std.algorithm : sum; 12 import std.functional : toDelegate; 13 import std.random : uniform; 14 import std.stdio : write,writeln,writefln; 15 16 import mutils.benchmark; 17 import mutils.job_manager.debug_sink; 18 import mutils.job_manager.manager; 19 import mutils.job_manager.shared_utils; 20 import mutils.job_manager.utils; 21 22 23 /// One Job and one Fiber.yield 24 void simpleYield(){ 25 auto fiberData=getFiberData(); 26 foreach(i;0..1){ 27 jobManager.addThisFiberAndYield(fiberData); 28 } 29 } 30 31 32 void activeSleep(uint u_seconds){ 33 StopWatch sw; 34 sw.start(); 35 while(sw.usecs<u_seconds){}//for 10us will iterate ~120 tiems 36 sw.stop(); 37 38 } 39 40 void veryDummy(){ 41 //foreach(i;0..10)writeln("asdasd"); 42 } 43 44 void makeTestJobsFrom(void function() fn,uint num){ 45 makeTestJobsFrom(fn.toDelegate,num); 46 } 47 48 void makeTestJobsFrom(JobDelegate deleg,uint num){ 49 UniversalJobGroup!JobDelegate group=UniversalJobGroup!JobDelegate(num); 50 foreach(int i;0..num){ 51 group.add(deleg); 52 } 53 group.callAndWait(); 54 } 55 56 void testFiberLockingToThread(){ 57 ThreadID id=Thread.getThis.id; 58 auto fiberData=getFiberData(); 59 foreach(i;0..1000){ 60 jobManager.addThisFiberAndYield(fiberData); 61 assert(id==Thread.getThis.id); 62 } 63 } 64 65 //returns how many jobs it have spawned 66 int randomRecursionJobs(int deepLevel){ 67 alias UD=UniversalDelegate!(int function(int)); 68 if(deepLevel==0){ 69 simpleYield(); 70 return 0; 71 } 72 int randNum=uniform(1,10); 73 //randNum=10; 74 75 alias ddd=typeof(&randomRecursionJobs); 76 UniversalJobGroup!ddd group=UniversalJobGroup!ddd(randNum); 77 foreach(int i;0..randNum){ 78 group.add(&randomRecursionJobs,deepLevel-1); 79 } 80 auto jobsRun=group.callAndWait(); 81 return sum(jobsRun)+randNum; 82 } 83 84 //returns how many jobs it have spawned 85 void testRandomRecursionJobs(){ 86 jobManager.debugHelper.resetCounters(); 87 int jobsRun=callAndWait!(typeof(&randomRecursionJobs))(&randomRecursionJobs,5); 88 assert(jobManager.debugHelper.jobsAdded==jobsRun+1); 89 assert(jobManager.debugHelper.jobsDone==jobsRun+1); 90 assert(jobManager.debugHelper.fibersAdded==jobsRun+2); 91 assert(jobManager.debugHelper.fibersDone==jobsRun+2); 92 } 93 94 95 void testPerformance(){ 96 uint iterations=1000; 97 uint packetSize=100; 98 StopWatch sw; 99 sw.start(); 100 jobManager.debugHelper.resetCounters(); 101 alias ddd=typeof(&simpleYield); 102 UniversalJobGroup!ddd group=UniversalJobGroup!ddd(packetSize); 103 foreach(int i;0..packetSize){ 104 group.add(&simpleYield); 105 } 106 //int[] pp= new int[100]; 107 foreach(i;0..iterations){ 108 group.callAndWait(); 109 } 110 111 112 assertM(jobManager.debugHelper.jobsAdded,iterations*packetSize); 113 assertM(jobManager.debugHelper.jobsDone ,iterations*packetSize); 114 assertM(jobManager.debugHelper.fibersAdded,iterations*packetSize+iterations); 115 assertM(jobManager.debugHelper.fibersDone ,iterations*packetSize+iterations); 116 sw.stop(); 117 writefln( "Benchmark: %s*%s : %s[ms], %s[it/ms]",iterations,packetSize,sw.msecs,iterations*packetSize/sw.msecs); 118 } 119 120 void testUnique(){ 121 shared int myCounter; 122 void localYield(){ 123 auto fiberData=getFiberData(); 124 DebugSink.add(atomicOp!"+="(myCounter,1)); 125 jobManager.addThisFiberAndYield(fiberData); 126 } 127 jobManager.debugHelper.resetCounters(); 128 DebugSink.reset(); 129 uint packetSize=1000; 130 131 alias ddd=typeof(&localYield); 132 UniversalJobGroup!ddd group=UniversalJobGroup!ddd(packetSize); 133 foreach(int i;0..packetSize){ 134 group.add(&localYield); 135 } 136 group.callAndWait(); 137 138 assertM(jobManager.debugHelper.jobsAdded,packetSize); 139 assertM(jobManager.debugHelper.jobsDone ,packetSize); 140 assertM(jobManager.debugHelper.fibersAdded,packetSize+1); 141 assertM(jobManager.debugHelper.fibersDone ,packetSize+1); 142 DebugSink.verifyUnique(packetSize); 143 } 144 145 146 void testPerformanceSleep(){ 147 uint partsNum=1000; 148 uint iterations=60; 149 uint u_secs=13; 150 151 152 alias ddd=typeof(&activeSleep); 153 UniversalJobGroup!ddd group=UniversalJobGroup!ddd(partsNum); 154 foreach(int i;0..partsNum){ 155 group.add(&activeSleep,u_secs); 156 } 157 StopWatch sw; 158 sw.start(); 159 foreach(i;0..iterations){ 160 group.callAndWait(); 161 } 162 163 164 sw.stop(); 165 result=cast(float)iterations*u_secs/sw.usecs; 166 167 writefln( "BenchmarkSleep: %s*%s : %s[us], %s[it/us] %s",iterations,u_secs,sw.usecs,cast(float)iterations*u_secs/sw.usecs,result/base); 168 } 169 170 171 172 173 174 alias mat4=float[16]; 175 //import gl3n.linalg; 176 void mulMat(mat4[] mA,mat4[] mB,mat4[] mC){ 177 assert(mA.length==mB.length && mB.length==mC.length); 178 foreach(i;0..mA.length){ 179 foreach(k;0..1){ 180 mC[i]=mB[i][]*mB[i][]; 181 } 182 } 183 } 184 185 __gshared float result; 186 __gshared float base=1; 187 void testPerformanceMatrix(){ 188 import std.parallelism; 189 uint partsNum=16; 190 uint iterations=100; 191 uint matricesNum=512; 192 assert(matricesNum%partsNum==0); 193 mat4[] matricesA=Mallocator.instance.makeArray!mat4(matricesNum); 194 mat4[] matricesB=Mallocator.instance.makeArray!mat4(matricesNum); 195 mat4[] matricesC=Mallocator.instance.makeArray!mat4(matricesNum); 196 scope(exit){ 197 Mallocator.instance.dispose(matricesA); 198 Mallocator.instance.dispose(matricesB); 199 Mallocator.instance.dispose(matricesC); 200 } 201 StopWatch sw; 202 sw.start(); 203 jobManager.debugHelper.resetCounters(); 204 uint step=matricesNum/partsNum; 205 206 alias ddd=typeof(&mulMat); 207 UniversalJobGroup!ddd group=UniversalJobGroup!ddd(partsNum); 208 foreach(int i;0..partsNum){ 209 group.add(&mulMat,matricesA[i*step..(i+1)*step],matricesB[i*step..(i+1)*step],matricesC[i*step..(i+1)*step]); 210 } 211 foreach(i;0..iterations){ 212 group.callAndWait(); 213 } 214 215 216 sw.stop(); 217 result=cast(float)iterations*matricesNum/sw.usecs; 218 writefln( "BenchmarkMatrix: %s*%s : %s[us], %s[it/us] %s",iterations,matricesNum,sw.usecs,cast(float)iterations*matricesNum/sw.usecs,result/base); 219 } 220 221 void testForeach(){ 222 int[200] ints; 223 shared uint sum=0; 224 foreach(ref int el;ints.multithreated){ 225 atomicOp!"+="(sum,1); 226 activeSleep(100);//simulate load for 100us 227 } 228 foreach(ref int el;ints.multithreated){ 229 activeSleep(100); 230 } 231 assert(sum==200); 232 } 233 234 void testGroupStart(){ 235 if(jobManager.threadsNum==1){ 236 writeln("Can not have background job while there is only one thread in threadPool."); 237 return; 238 } 239 uint partsNum=100; 240 241 alias ddd=typeof(&activeSleep); 242 UniversalJobGroup!ddd group=UniversalJobGroup!ddd(partsNum); 243 foreach(int i;0..partsNum){ 244 group.add(&activeSleep,10); 245 } 246 group.start(); 247 activeSleep(10); 248 assert(group.counter.count>0 && !group.counter.countedToZero()); 249 activeSleep(10000); 250 assert(group.areJobsDone); 251 252 } 253 254 void test(uint threadsNum=16){ 255 256 static void startTest(){ 257 foreach(i;0..1){ 258 alias UnDel=void delegate(); 259 testForeach(); 260 makeTestJobsFrom(&testFiberLockingToThread,100); 261 callAndWait!(UnDel)((&testUnique).toDelegate); 262 callAndWait!(UnDel)((&testPerformance).toDelegate); 263 callAndWait!(UnDel)((&testPerformanceMatrix).toDelegate); 264 callAndWait!(UnDel)((&testPerformanceSleep).toDelegate); 265 callAndWait!(UnDel)((&testGroupStart).toDelegate); 266 callAndWait!(UnDel)((&testRandomRecursionJobs).toDelegate); 267 //int[] pp= new int[1000];//Make some garbage so GC would trigger 268 } 269 270 } 271 jobManager.startMainLoop(&startTest,threadsNum); 272 } 273 void testScalability(){ 274 //foreach(i;3..4){ 275 // jobManager=Mallocator.instance.make!JobManager; 276 // scope(exit)Mallocator.instance.dispose(jobManager); 277 // write(i+1," "); 278 test(4); 279 //if(i==0)base=result; 280 //} 281 } 282 283 284 unittest{ 285 //test(); 286 }