1 /** 2 Multithreated test may take some space so they are there. 3 */ 4 module mutils.job_manager.manager_tests; 5 6 import core.atomic; 7 import core.simd; 8 import core.stdc.stdio; 9 import std.algorithm : sum; 10 import std.experimental.allocator; 11 import std.experimental.allocator.mallocator; 12 import std.functional : toDelegate; 13 14 import mutils.job_manager.manager; 15 import mutils.job_manager.utils; 16 import mutils.thread : Fiber, Thread; 17 import mutils.time; 18 19 import std.stdio; 20 21 /// One Job and one Fiber.yield 22 void simpleYield() { 23 auto fiberData = getFiberData(); 24 foreach (i; 0 .. 1) { 25 jobManager.addThisFiberAndYield(fiberData); 26 } 27 } 28 29 void simpleSleep(uint msecs) { 30 if (msecs == 0) { 31 return; 32 } 33 Thread.sleep(msecs); 34 } 35 36 void activeSleep(uint u_seconds) { 37 StopWatch sw; 38 sw.start(); 39 while (sw.usecs < u_seconds) { 40 //writeln("kk"); 41 } //for 10us will iterate ~120 tiems 42 sw.stop(); 43 44 } 45 46 void makeTestJobsFrom(void function() fn, uint num) { 47 makeTestJobsFrom(fn.toDelegate, num); 48 } 49 50 void makeTestJobsFrom(JobDelegate deleg, uint num) { 51 UniversalJobGroup!JobDelegate group = UniversalJobGroup!JobDelegate(num); 52 foreach (int i; 0 .. num) { 53 group.add(deleg); 54 } 55 group.callAndWait(); 56 } 57 58 void testFiberLockingToThread() { 59 auto id = Thread.getThisThreadNum(); 60 auto fiberData = getFiberData(); 61 foreach (i; 0 .. 1000) { 62 jobManager.addThisFiberAndYield(fiberData); 63 assertKM(id == Thread.getThisThreadNum()); 64 } 65 } 66 67 //returns how many jobs it have spawned 68 int randomRecursionJobs(int deepLevel) { 69 alias UD = UniversalDelegate!(int function(int)); 70 if (deepLevel == 0) { 71 simpleYield(); 72 return 0; 73 } 74 int randNum = 7; 75 76 alias ddd = typeof(&randomRecursionJobs); 77 UniversalJobGroup!ddd group = UniversalJobGroup!ddd(randNum); 78 foreach (int i; 0 .. randNum) { 79 group.add(&randomRecursionJobs, deepLevel - 1); 80 } 81 auto jobsRun = group.callAndWait(); 82 auto sumNum=sum(jobsRun) + randNum; 83 return sumNum; 84 } 85 86 //returns how many jobs it have spawned 87 void testRandomRecursionJobs() { 88 jobManager.debugHelper.resetCounters(); 89 int jobsRun = callAndWait!(typeof(&randomRecursionJobs))(&randomRecursionJobs, 5); 90 assertM(jobManager.debugHelper.jobsAdded, jobsRun + 1); 91 assertM(jobManager.debugHelper.jobsDone,jobsRun + 1); 92 assertM(jobManager.debugHelper.fibersAdded, jobsRun + 2); 93 assertM(jobManager.debugHelper.fibersDone, jobsRun + 2); 94 } 95 96 //__gshared long best=0; 97 void testPerformance() { 98 uint iterations = 1000; 99 uint packetSize = 1000; 100 StopWatch sw; 101 sw.start(); 102 jobManager.debugHelper.resetCounters(); 103 alias ddd = typeof(&simpleYield); 104 UniversalJobGroupNew group = UniversalJobGroupNew(packetSize); 105 foreach (int i; 0 .. packetSize) { 106 group.add!ddd(&simpleYield); 107 } 108 //int[] pp= new int[100]; 109 foreach (uint i; 0 .. iterations) { 110 group.callAndWait(); 111 } 112 113 assertM(jobManager.debugHelper.jobsAdded, iterations * packetSize); 114 assertM(jobManager.debugHelper.jobsDone, iterations * packetSize); 115 assertM(jobManager.debugHelper.fibersAdded, iterations * packetSize + iterations); 116 assertM(jobManager.debugHelper.fibersDone, iterations * packetSize + iterations); 117 sw.stop(); 118 long perMs = iterations * packetSize / sw.msecs; 119 printf("Performacnce performacnce: %dms, perMs: %d\n", cast(int)(sw.msecs), cast(int)(perMs)); 120 //if(perMs>best)best=perMs; 121 122 } 123 124 shared int myCounter; 125 void testUnique() { 126 import mutils.job_manager.debug_sink; 127 128 myCounter = 0; 129 static void localYield() { 130 auto fiberData = getFiberData(); 131 //DebugSink.add(atomicOp!"+="(myCounter,1)); 132 jobManager.addThisFiberAndYield(fiberData); 133 } 134 135 jobManager.debugHelper.resetCounters(); 136 //DebugSink.reset(); 137 uint packetSize = 1000; 138 139 alias ddd = typeof(&localYield); 140 UniversalJobGroup!ddd group = UniversalJobGroup!ddd(packetSize); 141 foreach (int i; 0 .. packetSize) { 142 group.add(&localYield); 143 } 144 group.callAndWait(); 145 146 assertM(jobManager.debugHelper.jobsAdded, packetSize); 147 assertM(jobManager.debugHelper.jobsDone, packetSize); 148 assertM(jobManager.debugHelper.fibersAdded, packetSize + 1); 149 assertM(jobManager.debugHelper.fibersDone, packetSize + 1); 150 //DebugSink.verifyUnique(packetSize); 151 } 152 153 void testPerformanceSleep() { 154 uint partsNum = 1000; 155 uint iterations = 60; 156 uint u_secs = 13; 157 158 alias ddd = typeof(&activeSleep); 159 UniversalJobGroupNew group = UniversalJobGroupNew(partsNum); 160 foreach (int i; 0 .. partsNum) { 161 group.add!ddd(&activeSleep, u_secs); 162 } 163 StopWatch sw; 164 sw.start(); 165 foreach (i; 0 .. iterations) { 166 group.callAndWait(); 167 } 168 169 sw.stop(); 170 result = cast(float) iterations * u_secs / sw.usecs; 171 172 } 173 174 alias mat4 = float[16]; 175 void mulMat(mat4[] mA, mat4[] mB, mat4[] mC) { 176 assertKM(mA.length == mB.length && mB.length == mC.length); 177 foreach (i; 0 .. mA.length) { 178 foreach (k; 0 .. 1) { 179 mC[i] = mB[i][] * mB[i][]; 180 } 181 } 182 } 183 184 __gshared float result; 185 __gshared float base = 1; 186 void testPerformanceMatrix() { 187 import std.parallelism; 188 189 uint partsNum = 128; 190 uint iterations = 100; 191 uint matricesNum = 512 * 64; 192 assertKM(matricesNum % partsNum == 0); 193 mat4[] matricesA = Mallocator.instance.makeArray!mat4(matricesNum); 194 mat4[] matricesB = Mallocator.instance.makeArray!mat4(matricesNum); 195 mat4[] matricesC = Mallocator.instance.makeArray!mat4(matricesNum); 196 scope (exit) { 197 Mallocator.instance.dispose(matricesA); 198 Mallocator.instance.dispose(matricesB); 199 Mallocator.instance.dispose(matricesC); 200 } 201 StopWatch sw; 202 sw.start(); 203 jobManager.debugHelper.resetCounters(); 204 uint step = matricesNum / partsNum; 205 206 alias ddd = typeof(&mulMat); 207 UniversalJobGroupNew group = UniversalJobGroupNew(partsNum); 208 foreach (int i; 0 .. partsNum) { 209 group.add!ddd(&mulMat, matricesA[i * step .. (i + 1) * step], 210 matricesB[i * step .. (i + 1) * step], matricesC[i * step .. (i + 1) * step]); 211 } 212 foreach (i; 0 .. iterations) { 213 group.callAndWait(); 214 } 215 216 sw.stop(); 217 result = cast(float) iterations * matricesNum / sw.usecs; 218 printf("Performacnce matrix: %dms\n", cast(int)(sw.msecs)); 219 } 220 221 void testForeach() { 222 int[200] ints; 223 shared uint sum = 0; 224 foreach (ref int el; ints.multithreaded) { 225 atomicOp!"+="(sum, 1); 226 activeSleep(100); //simulate load for 100us 227 } 228 foreach (ref int el; ints.multithreaded) { 229 activeSleep(100); 230 } 231 assertKM(sum == 200); 232 } 233 234 void testGroupStart() { 235 if (jobManager.threadsCount == 1) { 236 return; 237 } 238 uint partsNum = 100; 239 240 alias ddd = typeof(&activeSleep); 241 UniversalJobGroup!ddd group = UniversalJobGroup!ddd(partsNum); 242 foreach (int i; 0 .. partsNum) { 243 group.add(&activeSleep, 10); 244 } 245 group.start(); 246 activeSleep(10); 247 assertKM(group.counter.count > 0 && !group.counter.countedToZero()); 248 Thread.sleep(1000); 249 //activeSleep(1000_000); 250 assertKM(group.areJobsDone); 251 252 } 253 254 void testGroupsDependicesFuncA(int[] arr) { 255 foreach (ref el; arr) { 256 el = 1; 257 } 258 } 259 260 void testGroupsDependicesFuncB(int[] arr) { 261 foreach (ref el; arr) { 262 el = 1_000_000; 263 } 264 } 265 266 void testGroupsDependicesFuncC(int[] arr) { 267 foreach (ref el; arr) { 268 el *= 201; 269 } 270 } 271 272 void testGroupsDependicesFuncD(int[] arr) { 273 foreach (ref el; arr) { 274 el -= 90; 275 } 276 } 277 278 void testGroupsDependicesFuncE(int[] arrA, int[] arrB) { 279 assertKM(arrA.length == arrB.length); 280 foreach (i; 0 .. arrA.length) { 281 arrA[i] += arrB[i]; 282 arrA[i] *= -1; 283 } 284 } 285 286 void testGroupsDependices() { 287 288 enum uint partsNum = 512; 289 enum uint elementsNum = 512 * 512; 290 enum uint step = elementsNum / partsNum; 291 int[] elements = Mallocator.instance.makeArray!int(elementsNum); 292 int[] elements2 = Mallocator.instance.makeArray!int(elementsNum); 293 scope (exit) { 294 Mallocator.instance.dispose(elements); 295 Mallocator.instance.dispose(elements2); 296 } 297 alias ddd = void function(int[]); 298 alias dddSum = void function(int[], int[]); 299 300 auto groupA = UniversalJobGroupNew(); 301 auto groupB = UniversalJobGroupNew(); 302 auto groupC = UniversalJobGroupNew(); 303 auto groupD = UniversalJobGroupNew(); 304 auto groupE = UniversalJobGroupNew(); 305 /*groupA.name="groupA"; 306 groupB.name="groupB"; 307 groupC.name="groupC"; 308 groupD.name="groupD"; 309 groupE.name="groupE";*/ 310 311 foreach (int i; 0 .. partsNum) { 312 groupA.add!ddd(&testGroupsDependicesFuncA, elements[i * step .. (i + 1) * step]); 313 groupB.add!ddd(&testGroupsDependicesFuncB, elements2[i * step .. (i + 1) * step]); 314 groupC.add!ddd(&testGroupsDependicesFuncC, elements[i * step .. (i + 1) * step]); 315 groupD.add!ddd(&testGroupsDependicesFuncD, elements[i * step .. (i + 1) * step]); 316 groupE.add!dddSum(&testGroupsDependicesFuncE, 317 elements[i * step .. (i + 1) * step], elements2[i * step .. (i + 1) * step]); 318 } 319 320 //---------- groupKK ------------ 321 //groupA -> groupC -> groupD -> | 322 //groupB -> | groupE 323 324 //groupE.runOnJobsDone = true; 325 groupE.dependantOn(&groupD); 326 groupE.dependantOn(&groupB); 327 328 groupC.dependantOn(&groupA); 329 groupD.dependantOn(&groupC); 330 331 groupA.start(); 332 groupB.start(); 333 334 groupE.waitForCompletion(); 335 336 foreach (el; elements) { 337 assertM(el, -1_000_111); 338 } 339 } 340 341 void testUnbalancedGroups() { 342 343 enum uint groupsNum = 32; 344 alias ddd = void function(uint); 345 346 auto groupEnd = UniversalJobGroupNew(); 347 UniversalJobGroupNew[groupsNum] groups; 348 349 foreach (ref group; groups) { 350 group.add!ddd(&simpleSleep, 100); 351 foreach (i; 0 .. 32) { 352 group.add!ddd(&simpleSleep, 0); 353 } 354 groupEnd.dependantOn(&group); 355 } 356 357 StopWatch sw; 358 sw.start(); 359 foreach (ref group; groups) { 360 group.start(); 361 } 362 363 groupEnd.waitForCompletion(); 364 365 sw.stop(); 366 printf("Performacnce unbalanced: %dms\n", cast(int)(sw.msecs)); 367 } 368 369 void testWaitForGroup() { 370 alias ddd = void function(uint); 371 372 UniversalJobGroupNew groupA; 373 UniversalJobGroupNew groupEnd; 374 /*groupA.name="groupA"; 375 groupEnd.name="groupEnd";*/ 376 groupEnd.dependantOn(&groupA); 377 groupA.add!ddd(&simpleSleep, 0); 378 groupA.start(); 379 //Thread.sleep(100); 380 groupA.waitForCompletion(); 381 //Thread.sleep(100); 382 //groupEnd.waitForCompletion(); 383 } 384 385 386 void test(uint threadsNum = 16) { 387 import core.memory; 388 389 GC.disable(); 390 static void startTest() { 391 foreach (i; 0 .. 1) { 392 alias UnDel = void delegate(); 393 testForeach(); 394 makeTestJobsFrom(&testFiberLockingToThread, 100); 395 callAndWait!(UnDel)((&testUnique).toDelegate); 396 callAndWait!(UnDel)((&testGroupsDependices).toDelegate); 397 callAndWait!(UnDel)((&testWaitForGroup).toDelegate); 398 callAndWait!(UnDel)((&testPerformance).toDelegate); 399 callAndWait!(UnDel)((&testPerformanceMatrix).toDelegate); 400 callAndWait!(UnDel)((&testPerformanceSleep).toDelegate); 401 callAndWait!(UnDel)((&testUnbalancedGroups).toDelegate); 402 //callAndWait!(UnDel)((&testGroupStart).toDelegate);// Has to have long sleep 403 callAndWait!(UnDel)((&testRandomRecursionJobs).toDelegate); 404 } 405 //writeln(best); 406 407 } 408 409 jobManager.startMainLoop(&startTest, threadsNum); 410 jobManager.clear(); 411 } 412 413 void testScalability() { 414 foreach (int i; 1 .. 44) { 415 printf(" %d ", i); 416 test(i); 417 } 418 } 419 420 unittest { 421 //test(4); 422 }