1 /** 2 Module implements multithreaded job manager with fibers (coroutines). 3 Thanks to fibers any task can be stopped in the middle of execution and started again by this manager. 4 Fibers are bound to one thread due to TLS issues and performance reasons. 5 */ 6 7 module mutils.job_manager.manager_multithreaded; 8 //module mutils.job_manager.manager_multithreadeddd; version(none): 9 10 import core.atomic; 11 import core.stdc.stdio; 12 import core.stdc.stdlib : rand; 13 import std.functional : toDelegate; 14 import std.stdio; 15 16 import mutils.container.vector; 17 import mutils.container_shared.shared_queue; 18 import mutils.job_manager.fiber_cache; 19 import mutils.job_manager.manager_utils; 20 import mutils.job_manager.utils; 21 import mutils.thread : Fiber, Thread, Semaphore, instructionPause; 22 23 enum threadsPerCPU = 4; 24 25 alias JobVector = LowLockQueue!(JobDelegate*); 26 alias FiberVector = LowLockQueue!(FiberData); 27 28 __gshared JobManager jobManager; 29 30 struct JobManager { 31 struct DebugHelper { 32 align(64) shared uint jobsAdded; 33 align(64) shared uint jobsDone; 34 align(64) shared uint fibersAdded; 35 align(64) shared uint fibersDone; 36 37 void resetCounters() { 38 { 39 atomicStore(jobsAdded, 0); 40 atomicStore(jobsDone, 0); 41 atomicStore(fibersAdded, 0); 42 atomicStore(fibersDone, 0); 43 } 44 } 45 46 void jobsAddedAdd(int num = 1) { 47 debug atomicOp!"+="(jobsAdded, num); 48 } 49 50 void jobsDoneAdd(int num = 1) { 51 debug atomicOp!"+="(jobsDone, num); 52 } 53 54 void fibersAddedAdd(int num = 1) { 55 debug atomicOp!"+="(fibersAdded, num); 56 } 57 58 void fibersDoneAdd(int num = 1) { 59 debug atomicOp!"+="(fibersDone, num); 60 } 61 62 } 63 64 int threadsCount; 65 DebugHelper debugHelper; 66 // Jobs managment 67 private int addJobToQueueNum; 68 private Vector!JobVector waitingJobs; 69 // Fibers managment 70 private Vector!FiberTLSCache fibersCache; 71 private Vector!FiberVector waitingFibers; 72 // Thread managment 73 private Vector!Thread threadPool; 74 private Vector!Semaphore semaphores; 75 private bool exit; 76 77 private void initialize(uint threadsCount = 0) { 78 exit = false; 79 80 if (threadsCount == 0) 81 threadsCount = threadsPerCPU; 82 if (threadsCount == 0) 83 threadsCount = 4; 84 85 this.threadsCount = threadsCount; 86 87 waitingFibers.length = threadsCount; 88 threadPool.length = threadsCount; 89 waitingJobs.length = threadsCount; 90 semaphores.length = threadsCount; 91 fibersCache.length = threadsCount; 92 93 foreach (uint i; 0 .. threadsCount) { 94 waitingFibers[i].initialize(); 95 semaphores[i].initialize(); 96 threadPool[i].threadNum = i; 97 threadPool[i].setDg(&threadRunFunction); 98 waitingJobs[i].initialize(); 99 } 100 101 version (Android) 102 rt_init(); 103 } 104 105 void clear() { 106 foreach (i; 0 .. threadsCount) { 107 waitingJobs[i].clear(); 108 waitingFibers[i].clear(); 109 fibersCache[i].clear(); 110 semaphores[i].destroy(); 111 } 112 waitingFibers.clear(); 113 waitingJobs.clear(); 114 threadPool.clear(); 115 fibersCache.clear(); 116 semaphores.clear(); 117 } 118 119 void start() { 120 foreach (ref thread; threadPool) { 121 thread.start(); 122 } 123 } 124 125 void startMainLoop(void function() mainLoop, uint threadsCount = 0) { 126 startMainLoop(mainLoop.toDelegate, threadsCount); 127 } 128 129 void startMainLoop(JobDelegate mainLoop, uint threadsCount = 0) { 130 131 align(64) shared bool endLoop = false; 132 static struct NoGcDelegateHelper { 133 JobDelegate del; 134 shared bool* endPointer; 135 136 this(JobDelegate del, ref shared bool end) { 137 this.del = del; 138 endPointer = &end; 139 } 140 141 void call() { 142 del(); 143 atomicStore(*endPointer, true); 144 } 145 } 146 147 NoGcDelegateHelper helper = NoGcDelegateHelper(mainLoop, endLoop); 148 initialize(threadsCount); 149 auto del = &helper.call; 150 start(); 151 addJob(&del); 152 waitForEnd(endLoop); 153 end(); 154 } 155 156 void waitForEnd(ref shared bool end) { 157 bool wait = true; 158 do { 159 wait = !atomicLoad(end); 160 foreach (ref th; threadPool) { 161 if (!th.isRunning) { 162 wait = false; 163 } 164 } 165 Thread.sleep(10); 166 } 167 while (wait); 168 } 169 170 void end() { 171 exit = true; 172 foreach (i; 0 .. threadsCount) { 173 semaphores[i].post(); 174 } 175 foreach (ref thread; threadPool) { 176 thread.join(); 177 } 178 version (Android) 179 rt_close(); 180 181 } 182 183 void addFiber(FiberData fiberData) { 184 while(fiberData.fiber.state==Fiber.State.EXEC){ 185 instructionPause(); 186 } 187 assertKM(waitingFibers.length == threadPool.length); 188 assertKM(fiberData.fiber.state != Fiber.State.TERM && fiberData.fiber.state!=Fiber.State.EXEC); // - cannot be added because addThisFiberAndYield violates this assertion 189 debugHelper.fibersAddedAdd(); 190 waitingFibers[fiberData.threadNum].add(fiberData); 191 semaphores[fiberData.threadNum].post(); 192 } 193 194 // Only for tests - it is pointless to add itself to be immediately resumed 195 void addThisFiberAndYield(FiberData thisFiber) { 196 //writeln("addThisFiberAndYield"); 197 debugHelper.fibersAddedAdd(); 198 waitingFibers[thisFiber.threadNum].add(thisFiber); 199 semaphores[thisFiber.threadNum].post(); 200 Fiber.yield(); 201 } 202 203 void addJob(JobDelegate* del) { 204 debugHelper.jobsAddedAdd(); 205 206 int queueNum = addJobToQueueNum % threadsCount; 207 waitingJobs[queueNum].add(del); 208 semaphores[queueNum].post(); 209 addJobToQueueNum++; 210 } 211 212 void addJobs(JobDelegate*[] dels) { 213 debugHelper.jobsAddedAdd(cast(int) dels.length); 214 215 int part = cast(int)dels.length / threadsCount; 216 if (part > 0) { 217 foreach (i, ref wj; waitingJobs) { 218 wj.add(dels[i * part .. (i + 1) * part]); 219 220 foreach (kkk; 0 .. part) { 221 semaphores[i].post(); 222 } 223 } 224 dels = dels[part * waitingJobs.length .. $]; 225 } 226 foreach (del; dels) { 227 int queueNum = addJobToQueueNum % threadsCount; 228 waitingJobs[queueNum].add(del); 229 semaphores[queueNum].post(); 230 addJobToQueueNum++; 231 } 232 } 233 234 void addJobAndYield(JobDelegate* del) { 235 addJob(del); 236 Fiber.yield(); 237 } 238 239 void addJobsAndYield(JobDelegate*[] dels) { 240 addJobs(dels); 241 Fiber.yield(); 242 } 243 244 Fiber allocateFiber(JobDelegate del, int threadNum) { 245 Fiber fiber = fibersCache[threadNum].getData(); 246 assertKM(fiber.state == Fiber.State.TERM); 247 assertKM(fiber.myThreadNum == jobManagerThreadNum); 248 fiber.reset(del); 249 return fiber; 250 } 251 252 void deallocateFiber(Fiber fiber, int threadNum) { 253 fiber.threadStart = null; 254 fibersCache[threadNum].removeData(fiber); 255 } 256 257 Fiber getFiberOwnerThread(int threadNum) { 258 Fiber fiber; 259 FiberData fd = waitingFibers[threadNum].pop; 260 if (fd != FiberData.init) { 261 fiber = fd.fiber; 262 debugHelper.fibersDoneAdd(); 263 } else { 264 JobDelegate* job = waitingJobs[threadNum].pop(); 265 if (job !is null) { 266 debugHelper.jobsDoneAdd(); 267 fiber = allocateFiber(*job, threadNum); 268 } 269 } 270 return fiber; 271 } 272 273 Fiber getFiberThiefThread(int threadNum) { 274 Fiber fiber; 275 foreach (thSteal; 0 .. threadsCount) { 276 if (thSteal == threadNum) { 277 continue; // Do not steal from yourself 278 } 279 if (!semaphores[thSteal].tryWait()) { 280 continue; 281 } 282 JobDelegate* job = waitingJobs[thSteal].pop(); 283 if (job !is null) { 284 debugHelper.jobsDoneAdd(); 285 fiber = allocateFiber(*job, threadNum); 286 break; 287 } else { 288 semaphores[thSteal].post(); // Can not steal, give owner a chance to take it 289 } 290 291 292 } 293 return fiber; 294 } 295 296 void threadRunFunction() { 297 Fiber.initializeStatic(); 298 int threadNum = jobManagerThreadNum; 299 while (!exit) { 300 Fiber fiber; 301 if (semaphores[threadNum].tryWait()) { 302 fiber = getFiberOwnerThread(threadNum); 303 } else { 304 fiber = getFiberThiefThread(threadNum); 305 if (fiber is null) { 306 // Thread does not have own job and can not steal it, so wait for a job 307 // semaphores[threadNum].wait(); There is deadlock in some cases. For now use timedWait instaed of wait 308 bool ok=semaphores[threadNum].timedWait(1_000_000); 309 310 if(ok==false){ 311 //writeln("Semaphore 1s timed out..."); 312 } 313 fiber = getFiberOwnerThread(threadNum); 314 } 315 } 316 317 // Nothing to do 318 if (fiber is null) { 319 continue; 320 } 321 // Do the job 322 assertKM(fiber.state == Fiber.State.HOLD); 323 fiber.call(); 324 325 // Reuse fiber 326 if (fiber.state == Fiber.State.TERM) { 327 deallocateFiber(fiber, threadNum); 328 } 329 } 330 } 331 332 }