1 /** 2 Modules contains basic structures for job manager ex. FiberData, Counter. 3 It also contains structures/functions which extens functionality of job manager like: 4 - UniversalJob - job with parameters and return value 5 - UniversalJobGroup - group of jobs 6 - multithreaded - makes foreach execute in parallel 7 */ 8 module mutils.job_manager.manager_utils; 9 10 import core.atomic; 11 import core.stdc.stdio; 12 import core.stdc.string : memcpy, memset; 13 import std.algorithm : map; 14 import std.experimental.allocator; 15 import std.experimental.allocator.mallocator; 16 import std.traits : Parameters; 17 import std.stdio; 18 19 import mutils.container.vector; 20 import mutils.job_manager.manager; 21 import mutils.job_manager.utils; 22 import mutils.thread; 23 import mutils.thread : Fiber; 24 25 alias jobManagerThreadNum = Thread.getThisThreadNum; //thread local var 26 27 alias JobDelegate = void delegate(); 28 29 struct FiberData { 30 Fiber fiber; 31 uint threadNum; 32 } 33 34 FiberData getFiberData() { 35 Fiber fiber = Fiber.getThis(); 36 //printf("getFiberData fiber: %p\n", fiber); 37 assertKM(fiber !is null); 38 return FiberData(fiber, jobManagerThreadNum); 39 } 40 41 struct Counter { 42 align(64) shared int count; 43 align(64) FiberData waitingFiber; 44 45 this(uint count) { 46 this.count = count; 47 } 48 49 bool countedToZero() { 50 return atomicLoad(count) == 0; 51 } 52 53 void decrement() { 54 assertKM(atomicLoad(count) >= 0); 55 56 auto num = atomicOp!"-="(count, 1); 57 if (num == 0 && waitingFiber.fiber !is null) { 58 jobManager.addFiber(waitingFiber); 59 } 60 61 } 62 } 63 64 struct UniversalJob(Delegate) { 65 alias UnDelegate = UniversalDelegate!Delegate; 66 UnDelegate unDel; //user function to run, with parameters and return value 67 JobDelegate runDel; //wraper to decrement counter on end 68 Counter* counter; 69 void runWithCounter() { 70 assertKM(counter !is null); 71 unDel.callAndSaveReturn(); 72 static if (multithreadedManagerON) { 73 counter.decrement(); 74 } 75 } 76 //had to be allcoated my Mallocator 77 void runAndDeleteMyself() { 78 unDel.callAndSaveReturn(); 79 Mallocator.instance.dispose(&this); 80 } 81 82 void initialize(Delegate del, Parameters!(Delegate) args) { 83 unDel = makeUniversalDelegate!(Delegate)(del, args); 84 //runDel=&run; 85 } 86 87 } 88 // It is faster to add array of jobs 89 struct UniversalJobGroupNew { 90 enum int fiberWillYieldNum = 100000; 91 alias Delegate = void delegate(); 92 //string name; 93 bool runOnJobsDone; 94 bool spawnOnDependenciesFulfilled = true; 95 align(64) shared int dependicesWaitCount; 96 Vector!(UniversalJobGroupNew*) children; 97 98 Vector!Job jobs; 99 Vector!(JobDelegate*) jobPointers; 100 101 align(64) shared int countJobsToBeDone; 102 align(64) FiberData waitingFiber; 103 104 static struct Job { 105 UniversalJobGroupNew* group; 106 void function(ubyte*) delegateUNB; 107 Delegate delegateJob; // JobManager takes pointer to delegate, so there will be stored callJob delegate (&callJob) 108 ubyte[64] memoryForUserDelegate; 109 110 void callJob() { 111 delegateUNB(memoryForUserDelegate.ptr); 112 113 auto num = atomicOp!"-="(group.countJobsToBeDone, 1); 114 if (num == 0) { 115 group.onJobsCounterZero(); 116 } 117 118 // waitForCompletion() adding fiberWillYieldNum says: I will wait to be resumed. So make sure it will be resumed by true 119 if (num == fiberWillYieldNum) { 120 group.onJobsCounterZero(true); 121 } 122 } 123 124 } 125 126 this(int jobsNum) { 127 jobs.reserve(jobsNum); 128 } 129 130 void dependantOn(UniversalJobGroupNew* parent) { 131 parent.children ~= &this; 132 atomicOp!"+="(dependicesWaitCount, 1); 133 } 134 135 // runOnJobsDoneOverride is used when runOnJobsDone is to be set, but due to multithreading it might not be visible immediately 136 void onJobsCounterZero(bool runOnJobsDoneOverride = false) { 137 decrementChildrenDependices(); 138 139 if (runOnJobsDone || runOnJobsDoneOverride) { 140 jobManager.addFiber(waitingFiber); 141 } 142 } 143 144 void decrementChildrenDependices() { 145 foreach (UniversalJobGroupNew* group; children) { 146 assertKM(atomicLoad(group.dependicesWaitCount) >= 0); 147 148 auto num = atomicOp!"-="(group.dependicesWaitCount, 1); 149 if (num == 0) { 150 group.onDependicesCounterZero(); 151 } 152 153 } 154 } 155 156 void onDependicesCounterZero() { 157 if (spawnOnDependenciesFulfilled) { 158 start(); 159 } 160 } 161 162 void start() { 163 if (jobs.length != 0) { 164 setUpJobs(); 165 jobManager.addJobs(jobPointers[]); 166 } else { 167 // Immediately call group end 168 onJobsCounterZero(); 169 } 170 } 171 172 auto callAndWait() { 173 if (jobs.length != 0) { 174 // Add jobs and wait 175 runOnJobsDone = true; 176 waitingFiber = getFiberData(); 177 setUpJobs(); 178 jobManager.addJobsAndYield(jobPointers[]); 179 } else { 180 // Immediately call group end 181 onJobsCounterZero(); 182 } 183 } 184 185 186 void waitForCompletion() { 187 if(dependicesWaitCount==0 && countJobsToBeDone==0){ 188 return; 189 } 190 waitingFiber = getFiberData(); 191 auto num = atomicOp!"+="(countJobsToBeDone, fiberWillYieldNum); 192 if (dependicesWaitCount==0 && num == fiberWillYieldNum) { 193 // Decrementer saw 0 jobs done, it might not resume us if we would yield now 194 return; 195 } 196 runOnJobsDone = true; 197 Fiber.yield(); 198 } 199 200 void add(DG)(DG del, Parameters!(DG) args) { 201 auto und = UniversalDelegate!(typeof(del))(del, args); 202 ubyte[] delBytes = und.toBytes(); 203 204 Job job; 205 job.group = &this; 206 job.memoryForUserDelegate[0 .. delBytes.length] = delBytes[0 .. delBytes.length]; 207 job.delegateUNB = &und.callFromBytes; 208 209 jobs ~= job; 210 211 static assert(und.sizeof <= Job.memoryForUserDelegate.length); 212 } 213 214 auto setUpJobs() { 215 atomicOp!"+="(countJobsToBeDone, cast(int) jobs.length); 216 jobPointers.length = jobs.length; 217 foreach (i, ref jj; jobs) { 218 jj.delegateJob = &jj.callJob; 219 jobPointers[i] = &jj.delegateJob; 220 } 221 } 222 } 223 224 //It is faster to add array of jobs 225 struct UniversalJobGroup(Delegate) { 226 alias DelegateOnEnd = void delegate(); 227 alias UnJob = UniversalJob!(Delegate); 228 Counter counter; 229 uint jobsAdded; 230 UnJob[] unJobs; 231 JobDelegate*[] dels; 232 233 @disable this(); 234 @disable this(this); 235 236 this(uint jobsNum) { 237 mallocatorAllocate(jobsNum); 238 } 239 240 ~this() { 241 mallocatorDeallocate(); 242 } 243 244 void add(Delegate del, Parameters!(Delegate) args) { 245 unJobs[jobsAdded].initialize(del, args); 246 jobsAdded++; 247 } 248 249 //Returns data like getReturnData 250 auto callAndWait() { 251 setUpJobs(); 252 counter.waitingFiber = getFiberData(); 253 jobManager.addJobsAndYield(dels[0 .. jobsAdded]); 254 static if (UnJob.UnDelegate.hasReturn) { 255 return getReturnData(); 256 } 257 } 258 259 bool areJobsDone() { 260 return counter.countedToZero(); 261 } 262 263 ///Returns range so you can allocate it as you want 264 ///But remember: returned data lives as long as this object 265 auto getReturnData()() { 266 static assert(UnJob.UnDelegate.hasReturn); 267 assertKM(areJobsDone); 268 return unJobs.map!(a => a.unDel.result); 269 } 270 271 auto start() { 272 setUpJobs(); 273 jobManager.addJobs(dels[0 .. jobsAdded]); 274 } 275 276 private: 277 auto setUpJobs() { 278 counter.count = jobsAdded; 279 foreach (i, ref unJob; unJobs[0 .. jobsAdded]) { 280 unJob.counter = &counter; 281 unJob.runDel = &unJob.runWithCounter; 282 dels[i] = &unJob.runDel; 283 } 284 } 285 286 void mallocatorAllocate(uint jobsNum) { 287 unJobs = Mallocator.instance.makeArray!(UnJob)(jobsNum); 288 dels = Mallocator.instance.makeArray!(JobDelegate*)(jobsNum); 289 } 290 291 void mallocatorDeallocate() { 292 memset(unJobs.ptr, 0, UnJob.sizeof * unJobs.length); 293 memset(dels.ptr, 0, (JobDelegate*).sizeof * unJobs.length); 294 Mallocator.instance.dispose(unJobs); 295 Mallocator.instance.dispose(dels); 296 } 297 } 298 299 auto callAndWait(Delegate)(Delegate del, Parameters!(Delegate) args) { 300 UniversalJob!(Delegate) unJob; 301 unJob.initialize(del, args); 302 unJob.runDel = &unJob.runWithCounter; 303 Counter counter; 304 counter.count = 1; 305 counter.waitingFiber = getFiberData(); 306 unJob.counter = &counter; 307 jobManager.addJobAndYield(&unJob.runDel); 308 static if (unJob.unDel.hasReturn) { 309 return unJob.unDel.result; 310 } 311 } 312 313 auto callAndNothing(Delegate)(Delegate del, Parameters!(Delegate) args) { 314 static assert(!unJob.unDel.hasReturn); 315 UniversalJob!(Delegate)* unJob = Mallocator.instance.make!(UniversalJob!(Delegate)); 316 unJob.initialize(del, args); 317 unJob.runDel = &unJob.runAndDeleteMyself; 318 jobManager.addJob(&unJob.runDel); 319 } 320 321 auto multithreaded(T)(T[] slice) { 322 323 static struct Tmp { 324 import std.traits : ParameterTypeTuple; 325 326 T[] array; 327 int opApply(Dg)(scope Dg dg) { 328 static assert(ParameterTypeTuple!Dg.length == 1 || ParameterTypeTuple!Dg.length == 2); 329 enum hasI = ParameterTypeTuple!Dg.length == 2; 330 static if (hasI) 331 alias IType = ParameterTypeTuple!Dg[0]; 332 static struct NoGcDelegateHelper { 333 Dg del; 334 T[] arr; 335 static if (hasI) 336 IType iStart; 337 338 void call() { 339 foreach (int i, ref element; arr) { 340 static if (hasI) { 341 IType iSend = iStart + i; 342 int result = del(iSend, element); 343 } else { 344 int result = del(element); 345 } 346 assertKM(result == 0, 347 "Cant use break, continue, itp in multithreaded foreach"); 348 } 349 } 350 } 351 352 enum partsNum = 16; //constatnt number == easy usage of stack 353 if (array.length < partsNum) { 354 foreach (int i, ref element; array) { 355 static if (hasI) { 356 int result = dg(i, element); 357 } else { 358 int result = dg(element); 359 } 360 assertKM(result == 0, "Cant use break, continue, itp in multithreaded foreach"); 361 362 } 363 } else { 364 NoGcDelegateHelper[partsNum] helpers; 365 uint step = cast(uint) array.length / partsNum; 366 367 alias ddd = void delegate(); 368 UniversalJobGroup!ddd group = UniversalJobGroup!ddd(partsNum); 369 foreach (int i; 0 .. partsNum - 1) { 370 helpers[i].del = dg; 371 helpers[i].arr = array[i * step .. (i + 1) * step]; 372 static if (hasI) 373 helpers[i].iStart = i * step; 374 group.add(&helpers[i].call); 375 } 376 helpers[partsNum - 1].del = dg; 377 helpers[partsNum - 1].arr = array[(partsNum - 1) * step .. array.length]; 378 static if (hasI) 379 helpers[partsNum - 1].iStart = (partsNum - 1) * step; 380 group.add(&helpers[partsNum - 1].call); 381 382 group.callAndWait(); 383 } 384 return 0; 385 386 } 387 } 388 389 Tmp tmp; 390 tmp.array = slice; 391 return tmp; 392 }