1 /** 2 Module implements singlethreated job manager with fibers (coroutines). 3 Rather for debugging than for anything else. 4 */ 5 module mutils.job_manager.manager_singlethreated; 6 7 import std.functional : toDelegate; 8 9 import mutils.job_manager.fiber_cache; 10 import mutils.job_manager.manager_utils; 11 import mutils.job_manager.shared_utils; 12 import mutils.job_manager.utils; 13 import mutils.thread : Fiber; 14 15 __gshared JobManager jobManager = new JobManager; 16 17 class JobManager { 18 struct DebugHelper { 19 uint jobsAdded; 20 uint jobsDone; 21 uint fibersAdded; 22 uint fibersDone; 23 24 void resetCounters() { 25 jobsAdded = 0; 26 jobsDone = 0; 27 fibersAdded = 0; 28 fibersDone = 0; 29 } 30 31 void jobsAddedAdd(int num = 1) { 32 jobsAdded += num; 33 } 34 35 void jobsDoneAdd(int num = 1) { 36 jobsDone += num; 37 } 38 39 void fibersAddedAdd(int num = 1) { 40 fibersAdded += num; 41 } 42 43 void fibersDoneAdd(int num = 1) { 44 fibersDone += num; 45 } 46 } 47 48 DebugHelper debugHelper; 49 void initialize(uint threadsCount = 0) { 50 } 51 52 void start() { 53 } 54 55 void waitForEnd(ref shared bool end) { 56 } 57 58 void end() { 59 } 60 61 void startMainLoop(void function() mainLoop, uint threadsCount = 0) { 62 startMainLoop(mainLoop.toDelegate); 63 } 64 65 void startMainLoop(JobDelegate mainLoop, uint threadsCount = 0) { 66 initialize(); 67 addJob(&mainLoop); 68 //mainLoop(); 69 } 70 71 void addFiber(FiberData fiberData) { 72 assert(fiberData.fiber.state != Fiber.State.TERM && fiberData.fiber.state 73 != Fiber.State.EXEC); 74 debugHelper.fibersAddedAdd(); 75 fiberData.fiber.call(); 76 debugHelper.fibersDoneAdd(); 77 } 78 79 //Only for debug 80 void addThisFiberAndYield(FiberData thisFiber) { 81 debugHelper.fibersAddedAdd(); 82 debugHelper.fibersDoneAdd(); 83 } 84 85 void addJob(JobDelegate* del) { 86 debugHelper.jobsAddedAdd(); 87 //printStack(); 88 Fiber fiber = allocateFiber(*del); 89 assert(fiber.state != Fiber.State.TERM); 90 fiber.call(); 91 if (fiber.state == Fiber.State.TERM) { 92 deallocateFiber(fiber); 93 } 94 debugHelper.jobsDoneAdd(); 95 } 96 97 void addJobs(JobDelegate*[] dels) { 98 foreach (del; dels) { 99 addJob(del); 100 } 101 } 102 103 void addJobAndYield(JobDelegate* del) { 104 addJob(del); 105 debugHelper.fibersAddedAdd(); //Counter have started me 106 debugHelper.fibersDoneAdd(); 107 } 108 109 void addJobsAndYield(JobDelegate*[] dels) { 110 addJobs(dels); 111 debugHelper.fibersAddedAdd(); //Counter have started me 112 debugHelper.fibersDoneAdd(); 113 } 114 115 FiberTLSCache fibersCache; 116 Fiber allocateFiber(JobDelegate del) { 117 Fiber fiber; 118 fiber = fibersCache.getData(); 119 assert(fiber.state == Fiber.State.TERM); 120 fiber.reset(del); 121 return fiber; 122 } 123 124 void deallocateFiber(Fiber fiber) { 125 fibersCache.removeData(fiber); 126 } 127 128 }