1 /** 2 Modules contains basic structures for job manager ex. FiberData, Counter. 3 It also contains structures/functions which extens functionality of job manager like: 4 - UniversalJob - job with parameters and return value 5 - UniversalJobGroup - group of jobs 6 - multithreated - makes foreach execute in parallel 7 */ 8 module mutils.job_manager.manager_utils; 9 10 import core.atomic; 11 import core.stdc.string : memset,memcpy; 12 import core.thread : Fiber; 13 14 import std.algorithm : map; 15 import std.experimental.allocator; 16 import std.experimental.allocator.mallocator; 17 import std.format : format; 18 import std.traits : Parameters; 19 20 import mutils.job_manager.manager; 21 import mutils.job_manager.utils; 22 23 uint jobManagerThreadNum;//thread local var 24 alias JobDelegate=void delegate(); 25 26 struct FiberData{ 27 Fiber fiber; 28 uint threadNum; 29 } 30 31 FiberData getFiberData(){ 32 Fiber fiber=Fiber.getThis(); 33 assert(fiber !is null); 34 return FiberData(fiber,jobManagerThreadNum); 35 } 36 37 struct Counter{ 38 enum uint invalidCoun=10000; 39 40 align (64)shared int count; 41 align (64)FiberData waitingFiber; 42 43 44 this(uint count){ 45 this.count=count; 46 } 47 48 bool countedToZero(){ 49 return atomicLoad(count)==invalidCoun; 50 } 51 52 void decrement(){ 53 assert(atomicLoad(count)<invalidCoun-1000); 54 55 atomicOp!"-="(count, 1); 56 bool ok=cas(&count,0,invalidCoun); 57 if(ok && waitingFiber.fiber !is null){ 58 jobManager.addFiber(waitingFiber); 59 //waitingFiber.fiber=null;//makes deadlock maybe atomicStore would help or it shows some bug?? 60 //atomicStore(waitingFiber.fiber,null);//has to be shared ignore for now 61 } 62 63 64 } 65 } 66 67 struct UniversalJob(Delegate){ 68 alias UnDelegate=UniversalDelegate!Delegate; 69 UnDelegate unDel;//user function to run, with parameters and return value 70 JobDelegate runDel;//wraper to decrement counter on end 71 Counter* counter; 72 void runWithCounter(){ 73 assert(counter !is null); 74 unDel.callAndSaveReturn(); 75 static if(multithreatedManagerON)counter.decrement(); 76 } 77 //had to be allcoated my Mallocator 78 void runAndDeleteMyself(){ 79 unDel.callAndSaveReturn(); 80 Mallocator.instance.dispose(&this); 81 } 82 83 void initialize(Delegate del,Parameters!(Delegate) args){ 84 unDel=makeUniversalDelegate!(Delegate)(del,args); 85 //runDel=&run; 86 } 87 88 } 89 90 //It is faster to add array of jobs 91 struct UniversalJobGroup(Delegate){ 92 alias UnJob=UniversalJob!(Delegate); 93 Counter counter; 94 uint jobsNum; 95 uint jobsAdded; 96 UnJob[] unJobs; 97 JobDelegate*[] dels; 98 99 @disable this(); 100 @disable this(this); 101 102 this(uint jobsNum){ 103 this.jobsNum=jobsNum; 104 mallocatorAllocate(); 105 } 106 107 ~this(){ 108 mallocatorDeallocate(); 109 } 110 111 void add(Delegate del,Parameters!(Delegate) args){ 112 assert(unJobs.length>0 && jobsAdded<jobsNum); 113 unJobs[jobsAdded].initialize(del,args); 114 jobsAdded++; 115 } 116 117 deprecated("Use callAndWait") 118 auto wait(){ 119 static if(UnJob.UnDelegate.hasReturn){ 120 return callAndWait(); 121 }else{ 122 callAndWait(); 123 } 124 } 125 126 //Returns data like getReturnData 127 auto callAndWait(){ 128 setUpJobs(); 129 counter.waitingFiber=getFiberData(); 130 jobManager.addJobsAndYield(dels); 131 static if(UnJob.UnDelegate.hasReturn){ 132 return getReturnData(); 133 } 134 } 135 136 bool areJobsDone(){ 137 return counter.countedToZero(); 138 } 139 140 ///Returns range so you can allocate it as you want 141 ///But remember: returned data lives as long as this object 142 auto getReturnData()(){ 143 static assert(UnJob.UnDelegate.hasReturn); 144 assert(areJobsDone); 145 return unJobs.map!(a => a.unDel.result); 146 } 147 148 auto start(){ 149 setUpJobs(); 150 jobManager.addJobs(dels); 151 } 152 153 private: 154 auto setUpJobs(){ 155 assert(jobsAdded==jobsNum); 156 counter.count=jobsNum; 157 foreach(i,ref unJob;unJobs){ 158 unJob.counter=&counter; 159 unJob.runDel=&unJob.runWithCounter; 160 dels[i]=&unJob.runDel; 161 } 162 } 163 164 void mallocatorAllocate(){ 165 unJobs=Mallocator.instance.makeArray!(UnJob)(jobsNum); 166 dels=Mallocator.instance.makeArray!(JobDelegate*)(jobsNum); 167 } 168 169 void mallocatorDeallocate(){ 170 memset(unJobs.ptr,0,UnJob.sizeof*jobsNum); 171 memset(dels.ptr,0,(JobDelegate*).sizeof*jobsNum); 172 Mallocator.instance.dispose(unJobs); 173 Mallocator.instance.dispose(dels); 174 } 175 } 176 177 deprecated("Now UniversalJobGroup allcoates memory by itself. Delete call to this function.") string getStackMemory(string varName){ 178 return ""; 179 } 180 181 182 183 184 185 186 auto callAndWait(Delegate)(Delegate del,Parameters!(Delegate) args){ 187 UniversalJob!(Delegate) unJob; 188 unJob.initialize(del,args); 189 unJob.runDel=&unJob.runWithCounter; 190 Counter counter; 191 counter.count=1; 192 counter.waitingFiber=getFiberData(); 193 unJob.counter=&counter; 194 jobManager.addJobAndYield(&unJob.runDel); 195 static if(unJob.unDel.hasReturn){ 196 return unJob.unDel.result; 197 } 198 } 199 200 auto callAndNothing(Delegate)(Delegate del,Parameters!(Delegate) args){ 201 static assert(!unJob.unDel.hasReturn); 202 UniversalJob!(Delegate)* unJob=Mallocator.instance.make!(UniversalJob!(Delegate)); 203 unJob.initialize(del,args); 204 unJob.runDel=&unJob.runAndDeleteMyself; 205 jobManager.addJob(&unJob.runDel); 206 } 207 208 auto multithreated(T)(T[] slice){ 209 210 static struct Tmp { 211 import std.traits:ParameterTypeTuple; 212 T[] array; 213 int opApply(Dg)(scope Dg dg) 214 { 215 static assert (ParameterTypeTuple!Dg.length == 1 || ParameterTypeTuple!Dg.length == 2); 216 enum hasI=ParameterTypeTuple!Dg.length == 2; 217 static if(hasI)alias IType=ParameterTypeTuple!Dg[0]; 218 static struct NoGcDelegateHelper{ 219 Dg del; 220 T[] arr; 221 static if(hasI)IType iStart; 222 223 void call() { 224 foreach(int i,ref element;arr){ 225 static if(hasI){ 226 IType iSend=iStart+i; 227 int result=del(iSend,element); 228 }else{ 229 int result=del(element); 230 } 231 assert(result==0,"Cant use break, continue, itp in multithreated foreach"); 232 } 233 } 234 } 235 enum partsNum=16;//constatnt number == easy usage of stack 236 if(array.length<partsNum){ 237 foreach(int i,ref element;array){ 238 static if(hasI){ 239 int result=dg(i,element); 240 }else{ 241 int result=dg(element); 242 } 243 assert(result==0,"Cant use break, continue, itp in multithreated foreach"); 244 245 } 246 }else{ 247 NoGcDelegateHelper[partsNum] helpers; 248 uint step=cast(uint)array.length/partsNum; 249 250 alias ddd=void delegate(); 251 UniversalJobGroup!ddd group=UniversalJobGroup!ddd(partsNum); 252 foreach(int i;0..partsNum-1){ 253 helpers[i].del=dg; 254 helpers[i].arr=array[i*step..(i+1)*step]; 255 static if(hasI)helpers[i].iStart=i*step; 256 group.add(&helpers[i].call); 257 } 258 helpers[partsNum-1].del=dg; 259 helpers[partsNum-1].arr=array[(partsNum-1)*step..array.length]; 260 static if(hasI)helpers[partsNum-1].iStart=(partsNum-1)*step; 261 group.add(&helpers[partsNum-1].call); 262 263 group.callAndWait(); 264 } 265 return 0; 266 267 } 268 } 269 Tmp tmp; 270 tmp.array=slice; 271 return tmp; 272 } 273 274 275 276 277 278