1 /**
2  Modules contains basic structures for job manager ex. FiberData, Counter. 
3  It also contains structures/functions which extens functionality of job manager like:
4  - UniversalJob - job with parameters and return value
5  - UniversalJobGroup - group of jobs 
6  - multithreated - makes foreach execute in parallel
7  */
8 module mutils.job_manager.manager_utils;
9 
10 import core.atomic;
11 import core.stdc.string : memset,memcpy;
12 import core.thread : Fiber;
13 
14 import std.algorithm : map;
15 import std.experimental.allocator;
16 import std.experimental.allocator.mallocator;
17 import std.format : format;
18 import std.traits : Parameters;
19 
20 import mutils.job_manager.manager;
21 import mutils.job_manager.utils;
22 
23 uint jobManagerThreadNum;//thread local var
24 alias JobDelegate=void delegate();
25 
26 struct FiberData{
27 	Fiber fiber;
28 	uint threadNum;
29 }
30 
31 FiberData getFiberData(){
32 	Fiber fiber=Fiber.getThis();
33 	assert(fiber !is null);
34 	return FiberData(fiber,jobManagerThreadNum);
35 }
36 
37 struct Counter{
38 	enum uint invalidCoun=10000;
39 
40 	align (64)shared int count;
41 	align (64)FiberData waitingFiber;
42 
43 
44 	this(uint count){
45 		this.count=count;
46 	}
47 
48 	bool countedToZero(){
49 		return atomicLoad(count)==invalidCoun;
50 	}
51 
52 	void decrement(){
53 		assert(atomicLoad(count)<invalidCoun-1000);
54 		
55 		atomicOp!"-="(count, 1);
56 		bool ok=cas(&count,0,invalidCoun);
57 		if(ok && waitingFiber.fiber !is null){
58 			jobManager.addFiber(waitingFiber);
59 			//waitingFiber.fiber=null;//makes deadlock maybe atomicStore would help or it shows some bug??
60 			//atomicStore(waitingFiber.fiber,null);//has to be shared ignore for now
61 		}
62 		
63 		
64 	}
65 }
66 
67 struct UniversalJob(Delegate){
68 	alias UnDelegate=UniversalDelegate!Delegate;
69 	UnDelegate unDel;//user function to run, with parameters and return value
70 	JobDelegate runDel;//wraper to decrement counter on end
71 	Counter* counter;
72 	void runWithCounter(){
73 		assert(counter !is null);
74 		unDel.callAndSaveReturn();
75 		static if(multithreatedManagerON)counter.decrement();
76 	}
77 	//had to be allcoated my Mallocator
78 	void runAndDeleteMyself(){
79 		unDel.callAndSaveReturn();
80 		Mallocator.instance.dispose(&this);
81 	}
82 	
83 	void initialize(Delegate del,Parameters!(Delegate) args){
84 		unDel=makeUniversalDelegate!(Delegate)(del,args);
85 		//runDel=&run;
86 	}
87 	
88 }
89 
90 //It is faster to add array of jobs
91 struct UniversalJobGroup(Delegate){
92 	alias UnJob=UniversalJob!(Delegate);
93 	Counter counter;
94 	uint jobsNum;
95 	uint jobsAdded;
96 	UnJob[] unJobs;
97 	JobDelegate*[] dels;
98 
99 	@disable this();
100 	@disable this(this);
101 
102 	this(uint jobsNum){
103 		this.jobsNum=jobsNum;
104 		mallocatorAllocate();
105 	}
106 
107 	~this(){
108 		mallocatorDeallocate();
109 	}
110 
111 	void add(Delegate del,Parameters!(Delegate) args){
112 		assert(unJobs.length>0 && jobsAdded<jobsNum);
113 		unJobs[jobsAdded].initialize(del,args);
114 		jobsAdded++;
115 	}
116 
117 	deprecated("Use callAndWait")
118 	auto wait(){
119 		static if(UnJob.UnDelegate.hasReturn){
120 			return callAndWait();
121 		}else{
122 			callAndWait();
123 		}
124 	}
125 
126 	//Returns data like getReturnData
127 	auto callAndWait(){
128 		setUpJobs();
129 		counter.waitingFiber=getFiberData();
130 		jobManager.addJobsAndYield(dels);
131 		static if(UnJob.UnDelegate.hasReturn){
132 			return getReturnData();
133 		}		
134 	}
135 	
136 	bool areJobsDone(){
137 		return counter.countedToZero();	
138 	}
139 
140 	///Returns range so you can allocate it as you want
141 	///But remember: returned data lives as long as this object
142 	auto getReturnData()(){
143 		static assert(UnJob.UnDelegate.hasReturn);
144 		assert(areJobsDone);
145 		return unJobs.map!(a => a.unDel.result);
146 	}
147 
148 	auto start(){
149 		setUpJobs();
150 		jobManager.addJobs(dels);		
151 	}
152 
153 private:
154 	auto setUpJobs(){
155 		assert(jobsAdded==jobsNum);
156 		counter.count=jobsNum;
157 		foreach(i,ref unJob;unJobs){
158 			unJob.counter=&counter;
159 			unJob.runDel=&unJob.runWithCounter;
160 			dels[i]=&unJob.runDel;
161 		}		
162 	}
163 
164 	void mallocatorAllocate(){
165 		unJobs=Mallocator.instance.makeArray!(UnJob)(jobsNum);
166 		dels=Mallocator.instance.makeArray!(JobDelegate*)(jobsNum);
167 	}
168 
169 	void mallocatorDeallocate(){
170 		memset(unJobs.ptr,0,UnJob.sizeof*jobsNum);
171 		memset(dels.ptr,0,(JobDelegate*).sizeof*jobsNum);
172 		Mallocator.instance.dispose(unJobs);
173 		Mallocator.instance.dispose(dels);
174 	}
175 }
176 
177 deprecated("Now UniversalJobGroup allcoates memory by itself. Delete call to this function.") string getStackMemory(string varName){	
178 	return "";
179 }
180 
181 
182 
183 
184 
185 
186 auto callAndWait(Delegate)(Delegate del,Parameters!(Delegate) args){
187 	UniversalJob!(Delegate) unJob;
188 	unJob.initialize(del,args);
189 	unJob.runDel=&unJob.runWithCounter;
190 	Counter counter;
191 	counter.count=1;
192 	counter.waitingFiber=getFiberData();
193 	unJob.counter=&counter;
194 	jobManager.addJobAndYield(&unJob.runDel);
195 	static if(unJob.unDel.hasReturn){
196 		return unJob.unDel.result;
197 	}
198 }
199 
200 auto callAndNothing(Delegate)(Delegate del,Parameters!(Delegate) args){
201 	static assert(!unJob.unDel.hasReturn);
202 	UniversalJob!(Delegate)* unJob=Mallocator.instance.make!(UniversalJob!(Delegate));
203 	unJob.initialize(del,args);
204 	unJob.runDel=&unJob.runAndDeleteMyself;
205 	jobManager.addJob(&unJob.runDel);
206 }
207 
208 auto multithreated(T)(T[] slice){
209 	
210 	static struct Tmp {
211 		import std.traits:ParameterTypeTuple;
212 		T[] array;
213 		int opApply(Dg)(scope Dg dg)
214 		{ 
215 			static assert (ParameterTypeTuple!Dg.length == 1 || ParameterTypeTuple!Dg.length == 2);
216 			enum hasI=ParameterTypeTuple!Dg.length == 2;
217 			static if(hasI)alias IType=ParameterTypeTuple!Dg[0];
218 			static struct NoGcDelegateHelper{
219 				Dg del;
220 				T[] arr;
221 				static if(hasI)IType iStart;
222 				
223 				void call() { 
224 					foreach(int i,ref element;arr){
225 						static if(hasI){
226 							IType iSend=iStart+i;
227 							int result=del(iSend,element);
228 						}else{
229 							int result=del(element);
230 						}
231 						assert(result==0,"Cant use break, continue, itp in multithreated foreach");
232 					}	
233 				}
234 			}
235 			enum partsNum=16;//constatnt number == easy usage of stack
236 			if(array.length<partsNum){
237 				foreach(int i,ref element;array){
238 					static if(hasI){
239 						int result=dg(i,element);
240 					}else{
241 						int result=dg(element);
242 					}
243 					assert(result==0,"Cant use break, continue, itp in multithreated foreach");
244 					
245 				}
246 			}else{
247 				NoGcDelegateHelper[partsNum] helpers;
248 				uint step=cast(uint)array.length/partsNum;
249 				
250 				alias ddd=void delegate();
251 				UniversalJobGroup!ddd group=UniversalJobGroup!ddd(partsNum);
252 				foreach(int i;0..partsNum-1){
253 					helpers[i].del=dg;
254 					helpers[i].arr=array[i*step..(i+1)*step];
255 					static if(hasI)helpers[i].iStart=i*step;
256 					group.add(&helpers[i].call);
257 				}
258 				helpers[partsNum-1].del=dg;
259 				helpers[partsNum-1].arr=array[(partsNum-1)*step..array.length];
260 				static if(hasI)helpers[partsNum-1].iStart=(partsNum-1)*step;
261 				group.add(&helpers[partsNum-1].call);
262 				
263 				group.callAndWait();
264 			}
265 			return 0;
266 			
267 		}
268 	}
269 	Tmp tmp;
270 	tmp.array=slice;
271 	return tmp;
272 }
273 
274 
275 
276 
277 
278