1 /** 2 Module implements multithreated job manager with fibers (coroutines). 3 Thanks to fibers any task can be stopped in the middle of execution and started again by this manager. 4 Fibers are bound to one thread due to TLS issues and performance reasons. 5 */ 6 module mutils.job_manager.manager_multithreated; 7 8 import core.atomic; 9 import core.cpuid : threadsPerCPU; 10 import core.thread : Thread,ThreadID,Fiber; 11 import core.runtime; 12 13 import std.conv : to; 14 import std.datetime; 15 import std.functional : toDelegate; 16 import std.random : uniform; 17 import std.stdio : write,writeln,writefln; 18 19 import mutils.job_manager.debug_data; 20 import mutils.job_manager.debug_sink; 21 import mutils.job_manager.fiber_cache; 22 import mutils.job_manager.manager_utils; 23 import mutils.container_shared.shared_queue; 24 import mutils.job_manager.shared_utils; 25 26 27 alias JobVector=LowLockQueue!(JobDelegate*,bool); 28 //alias JobVector=LockedVector!(JobDelegate*); 29 //alias JobVector=LockedVectorBuildIn!(JobDelegate*); 30 31 alias FiberVector=LowLockQueue!(FiberData,bool); 32 //alias FiberVector=LockedVector!(FiberData); 33 //alias FiberVector=LockedVectorBuildIn!(FiberData); 34 35 36 //alias CacheVector=FiberNoCache; 37 //alias CacheVector=FiberOneCache; 38 //alias CacheVector=FiberVectorCache; 39 alias CacheVector=FiberTLSCache; 40 41 42 __gshared JobManager jobManager=new JobManager; 43 44 class JobManager{ 45 struct DebugHelper{ 46 align(64)shared uint jobsAdded; 47 align(64)shared uint jobsDone; 48 align(64)shared uint fibersAdded; 49 align(64)shared uint fibersDone; 50 51 void resetCounters(){ 52 { 53 atomicStore(jobsAdded, 0); 54 atomicStore(jobsDone, 0); 55 atomicStore(fibersAdded, 0); 56 atomicStore(fibersDone, 0); 57 } 58 } 59 void jobsAddedAdd (int num=1){ atomicOp!"+="(jobsAdded, num); } 60 void jobsDoneAdd (int num=1){ atomicOp!"+="(jobsDone, num); } 61 void fibersAddedAdd(int num=1){ atomicOp!"+="(fibersAdded,num); } 62 void fibersDoneAdd (int num=1){ atomicOp!"+="(fibersDone, num); } 63 64 65 66 } 67 DebugHelper debugHelper; 68 69 //jobs managment 70 private JobVector waitingJobs; 71 //fibers managment 72 private FiberVector[] waitingFibers; 73 //thread managment 74 private Thread[] threadPool; 75 bool exit; 76 77 78 private void initialize(uint threadsCount=0){ 79 if(threadsCount==0)threadsCount=threadsPerCPU; 80 if(threadsCount==0)threadsCount=4; 81 waitingFibers=Mallocator.instance.makeArray!(FiberVector)(threadsCount); 82 foreach(ref f;waitingFibers)f=Mallocator.instance.make!FiberVector; 83 threadPool=Mallocator.instance.makeArray!(Thread)(threadsCount); 84 foreach(i;0..threadsCount){ 85 Thread th=Mallocator.instance.make!Thread(&threadRunFunction); 86 //th.name=i.to!string; 87 threadPool[i]=th; 88 } 89 90 waitingJobs=Mallocator.instance.make!JobVector(); 91 fibersCache=Mallocator.instance.make!CacheVector(); 92 DebugSink.initializeShared(); 93 version(Android)rt_init(); 94 } 95 void start(){ 96 foreach(thread;threadPool){ 97 thread.start(); 98 } 99 } 100 void startMainLoop(void function() mainLoop,uint threadsCount=0){ 101 startMainLoop(mainLoop.toDelegate,threadsCount); 102 } 103 void startMainLoop(JobDelegate mainLoop,uint threadsCount=0){ 104 105 shared bool endLoop=false; 106 static struct NoGcDelegateHelper 107 { 108 JobDelegate del; 109 shared bool* endPointer; 110 111 this(JobDelegate del,ref shared bool end){ 112 this.del=del; 113 endPointer=&end; 114 } 115 116 void call() { 117 del(); 118 atomicStore(*endPointer,true); 119 } 120 } 121 NoGcDelegateHelper helper=NoGcDelegateHelper(mainLoop,endLoop); 122 initialize(threadsCount); 123 auto del=&helper.call; 124 start(); 125 addJob(&del); 126 waitForEnd(endLoop); 127 end(); 128 129 130 } 131 132 void waitForEnd(ref shared bool end){ 133 bool wait=true; 134 do{ 135 wait= !atomicLoad(end); 136 foreach(th;threadPool){ 137 if(!th.isRunning){ 138 wait=false; 139 } 140 } 141 Thread.sleep(10.msecs); 142 }while(wait); 143 } 144 void end(){ 145 exit=true; 146 foreach(thread;threadPool){ 147 thread.join; 148 } 149 version(Android)rt_close(); 150 151 } 152 153 size_t threadsNum(){ 154 return threadPool.length; 155 } 156 157 158 void addFiber(FiberData fiberData){ 159 assert(waitingFibers.length==threadPool.length); 160 assert(fiberData.fiber.state!=Fiber.State.TERM );//&& fiberData.fiber.state!=Fiber.State.EXEC 161 debugHelper.fibersAddedAdd(); 162 waitingFibers[fiberData.threadNum].add(fiberData);//range violation?? 163 } 164 //Only for debug and there it ma cause bugs 165 void addThisFiberAndYield(FiberData thisFiber){ 166 addFiber(thisFiber);//We add running fiber and 167 Fiber.yield();//wish that it wont be called before this yield 168 } 169 170 void addJob(JobDelegate* del){ 171 debugHelper.jobsAddedAdd(); 172 waitingJobs.add(del); 173 } 174 void addJobs(JobDelegate*[] dels){ 175 debugHelper.jobsAddedAdd(cast(int)dels.length); 176 waitingJobs.add(dels); 177 } 178 void addJobAndYield(JobDelegate* del){ 179 addJob(del); 180 Fiber.yield(); 181 } 182 void addJobsAndYield(JobDelegate*[] dels){ 183 addJobs(dels); 184 Fiber.yield(); 185 } 186 187 188 CacheVector fibersCache; 189 uint fibersMade; 190 191 Fiber allocateFiber(JobDelegate del){ 192 Fiber fiber; 193 fiber=fibersCache.getData(jobManagerThreadNum,cast(uint)threadPool.length); 194 assert(fiber.state==Fiber.State.TERM); 195 fiber.reset(del); 196 fibersMade++; 197 return fiber; 198 } 199 void deallocateFiber(Fiber fiber){ 200 fibersCache.removeData(fiber,jobManagerThreadNum,cast(uint)threadPool.length); 201 } 202 void runNextJob(){ 203 static int dummySink; 204 static int nothingToDoNum; 205 206 Fiber fiber; 207 FiberData fd=waitingFibers[jobManagerThreadNum].pop; 208 if(fd!=FiberData.init){ 209 fiber=fd.fiber; 210 debugHelper.fibersDoneAdd(); 211 }else if( !waitingJobs.empty ){ 212 JobDelegate* job; 213 job=waitingJobs.pop(); 214 if(job !is null){ 215 debugHelper.jobsDoneAdd(); 216 fiber=allocateFiber(*job); 217 } 218 } 219 //nothing to do 220 if(fiber is null ){ 221 nothingToDoNum++; 222 if(nothingToDoNum>50){ 223 Thread.sleep(10.usecs); 224 }else{ 225 foreach(i;0..uniform(0,20))dummySink+=uniform(1,2);//backoff 226 } 227 return; 228 } 229 //do the job 230 nothingToDoNum=0; 231 assert(fiber.state==Fiber.State.HOLD); 232 fiber.call(); 233 234 //reuse fiber 235 if(fiber.state==Fiber.State.TERM){ 236 deallocateFiber(fiber); 237 } 238 } 239 240 241 void threadRunFunction(){ 242 shared static int threadNumGenerator=-1; 243 jobManagerThreadNum=atomicOp!"+="(threadNumGenerator,1); 244 initializeDebugData(); 245 DebugSink.initialize(); 246 initializeFiberCache(); 247 while(!exit){ 248 runNextJob(); 249 } 250 deinitializeDebugData(); 251 DebugSink.deinitialize(); 252 } 253 254 } 255 256