1 /** 2 Module implements singlethreated job manager with fibers (coroutines). 3 Rather for debugging than for anything else. 4 */ 5 module mutils.job_manager.manager_singlethreated; 6 7 import core.thread : Fiber; 8 9 import std.functional : toDelegate; 10 import std.stdio : write,writeln,writefln; 11 12 import mutils.job_manager.fiber_cache; 13 import mutils.job_manager.manager_utils; 14 import mutils.job_manager.shared_utils; 15 import mutils.job_manager.utils; 16 17 //alias CacheVector=FiberNoCache; 18 //alias CacheVector=FiberOneCache; 19 //alias CacheVector=FiberVectorCache; 20 alias CacheVector=FiberTLSCache; 21 22 23 24 __gshared JobManager jobManager=new JobManager; 25 26 class JobManager{ 27 struct DebugHelper{ 28 uint jobsAdded; 29 uint jobsDone; 30 uint fibersAdded; 31 uint fibersDone; 32 33 void resetCounters(){ 34 jobsAdded=0; 35 jobsDone=0; 36 fibersAdded=0; 37 fibersDone=0; 38 } 39 void jobsAddedAdd (int num=1){ jobsAdded+= num; } 40 void jobsDoneAdd (int num=1){ jobsDone+= num;} 41 void fibersAddedAdd(int num=1){ fibersAdded+= num; } 42 void fibersDoneAdd (int num=1){ fibersDone+= num; } 43 } 44 45 DebugHelper debugHelper; 46 void initialize(uint threadsCount=0){ 47 fibersCache=Mallocator.instance.make!CacheVector(); 48 } 49 50 void start(){} 51 void waitForEnd(ref shared bool end){} 52 void end(){} 53 54 void startMainLoop(void function() mainLoop,uint threadsCount=0){ 55 startMainLoop(mainLoop.toDelegate); 56 } 57 58 void startMainLoop(JobDelegate mainLoop,uint threadsCount=0){ 59 initialize(); 60 addJob(&mainLoop); 61 //mainLoop(); 62 } 63 64 65 void addFiber(FiberData fiberData){ 66 assert(fiberData.fiber.state!=Fiber.State.TERM && fiberData.fiber.state!=Fiber.State.EXEC); 67 debugHelper.fibersAddedAdd(); 68 fiberData.fiber.call(); 69 debugHelper.fibersDoneAdd(); 70 } 71 72 //Only for debug 73 void addThisFiberAndYield(FiberData thisFiber){ 74 debugHelper.fibersAddedAdd(); 75 debugHelper.fibersDoneAdd(); 76 } 77 78 void addJob(JobDelegate* del){ 79 debugHelper.jobsAddedAdd(); 80 //printStack(); 81 Fiber fiber=allocateFiber(*del); 82 assert(fiber.state!=Fiber.State.TERM); 83 fiber.call(); 84 if(fiber.state==Fiber.State.TERM){ 85 deallocateFiber(fiber); 86 } 87 debugHelper.jobsDoneAdd(); 88 } 89 90 void addJobs(JobDelegate*[] dels){ 91 foreach(del;dels){ 92 addJob(del); 93 } 94 } 95 96 void addJobAndYield(JobDelegate* del){ 97 addJob(del); 98 debugHelper.fibersAddedAdd();//Counter have started me 99 debugHelper.fibersDoneAdd(); 100 } 101 102 void addJobsAndYield(JobDelegate*[] dels){ 103 addJobs(dels); 104 debugHelper.fibersAddedAdd();//Counter have started me 105 debugHelper.fibersDoneAdd(); 106 } 107 108 CacheVector fibersCache; 109 uint fibersMade; 110 Fiber allocateFiber(JobDelegate del){ 111 Fiber fiber; 112 fiber=fibersCache.getData(0,1); 113 assert(fiber.state==Fiber.State.TERM); 114 fiber.reset(del); 115 fibersMade++; 116 return fiber; 117 } 118 119 void deallocateFiber(Fiber fiber){ 120 fibersCache.removeData(fiber,0,1); 121 } 122 123 }