1 /**
2 Module implements multithreated job manager with fibers (coroutines).
3 Thanks to fibers any task can be stopped in the middle of execution and started again by this manager.
4 Fibers are bound to one thread due to TLS issues and performance reasons.
5  */
6 module mutils.job_manager.manager_multithreated;
7 
8 import core.atomic;
9 import core.cpuid : threadsPerCPU;
10 import core.thread : Thread,ThreadID,Fiber;
11 import core.runtime;
12 
13 import std.conv : to;
14 import std.datetime;
15 import std.functional : toDelegate;
16 import std.random : uniform;
17 import std.stdio : write,writeln,writefln;
18 
19 import mutils.job_manager.debug_data;
20 import mutils.job_manager.debug_sink;
21 import mutils.job_manager.fiber_cache;
22 import mutils.job_manager.manager_utils;
23 import mutils.container_shared.shared_queue;
24 import mutils.job_manager.shared_utils;
25 
26 
27 alias JobVector=LowLockQueue!(JobDelegate*,bool);
28 //alias JobVector=LockedVector!(JobDelegate*);
29 //alias JobVector=LockedVectorBuildIn!(JobDelegate*);
30 
31 alias FiberVector=LowLockQueue!(FiberData,bool);
32 //alias FiberVector=LockedVector!(FiberData);
33 //alias FiberVector=LockedVectorBuildIn!(FiberData);
34 
35 
36 //alias CacheVector=FiberNoCache;
37 //alias CacheVector=FiberOneCache;
38 //alias CacheVector=FiberVectorCache;
39 alias CacheVector=FiberTLSCache;
40 
41 
42 __gshared JobManager jobManager=new JobManager;
43 
44 class JobManager{
45 	struct DebugHelper{
46 		align(64)shared uint jobsAdded;
47 		align(64)shared uint jobsDone;
48 		align(64)shared uint fibersAdded;
49 		align(64)shared uint fibersDone;
50 		
51 		void resetCounters(){
52 			{
53 				atomicStore(jobsAdded, 0);
54 				atomicStore(jobsDone, 0);
55 				atomicStore(fibersAdded, 0);
56 				atomicStore(fibersDone, 0);
57 			}
58 		}
59 		void jobsAddedAdd  (int num=1){	 atomicOp!"+="(jobsAdded,  num); }
60 		void jobsDoneAdd   (int num=1){	 atomicOp!"+="(jobsDone,   num); }
61 		void fibersAddedAdd(int num=1){	 atomicOp!"+="(fibersAdded,num); }
62 		void fibersDoneAdd (int num=1){	 atomicOp!"+="(fibersDone, num); }
63 
64 		
65 		
66 	}
67 	DebugHelper debugHelper;
68 
69 	//jobs managment
70 	private JobVector waitingJobs;
71 	//fibers managment
72 	private FiberVector[] waitingFibers;
73 	//thread managment
74 	private Thread[] threadPool;
75 	bool exit;
76 
77 
78 	private void initialize(uint threadsCount=0){
79 		if(threadsCount==0)threadsCount=threadsPerCPU;
80 		if(threadsCount==0)threadsCount=4;
81 		waitingFibers=Mallocator.instance.makeArray!(FiberVector)(threadsCount);
82 		foreach(ref f;waitingFibers)f=Mallocator.instance.make!FiberVector;
83 		threadPool=Mallocator.instance.makeArray!(Thread)(threadsCount);
84 		foreach(i;0..threadsCount){
85 			Thread th=Mallocator.instance.make!Thread(&threadRunFunction);
86 			//th.name=i.to!string;
87 			threadPool[i]=th;
88 		}
89 
90 		waitingJobs=Mallocator.instance.make!JobVector();
91 		fibersCache=Mallocator.instance.make!CacheVector();
92 		DebugSink.initializeShared();
93 		version(Android)rt_init();
94 	}
95 	void start(){
96 		foreach(thread;threadPool){
97 			thread.start();
98 		}
99 	}
100 	void startMainLoop(void function() mainLoop,uint threadsCount=0){
101 		startMainLoop(mainLoop.toDelegate,threadsCount);
102 	}
103 	void startMainLoop(JobDelegate mainLoop,uint threadsCount=0){
104 		
105 		shared bool endLoop=false;
106 		static struct NoGcDelegateHelper
107 		{
108 			JobDelegate del;
109 			shared bool* endPointer;
110 
111 			this(JobDelegate del,ref shared bool end){
112 				this.del=del;
113 				endPointer=&end;
114 			}
115 
116 			void call() { 
117 				del();
118 				atomicStore(*endPointer,true);			
119 			}
120 		}
121 		NoGcDelegateHelper helper=NoGcDelegateHelper(mainLoop,endLoop);
122 		initialize(threadsCount);
123 		auto del=&helper.call;
124 		start();
125 		addJob(&del);
126 		waitForEnd(endLoop);
127 		end();
128 
129 		
130 	}
131 
132 	void waitForEnd(ref shared bool end){
133 		bool wait=true;
134 		do{
135 			wait= !atomicLoad(end);
136 			foreach(th;threadPool){
137 				if(!th.isRunning){
138 					wait=false;
139 				}
140 			}
141 			Thread.sleep(10.msecs);
142 		}while(wait);
143 	}
144 	void end(){
145 		exit=true;
146 		foreach(thread;threadPool){
147 			thread.join;
148 		}
149 		version(Android)rt_close();
150 
151 	}
152 
153 	size_t threadsNum(){
154 		return threadPool.length;
155 	}
156 
157 	
158 	void addFiber(FiberData fiberData){
159 		assert(waitingFibers.length==threadPool.length);
160 		assert(fiberData.fiber.state!=Fiber.State.TERM );//&& fiberData.fiber.state!=Fiber.State.EXEC
161 		debugHelper.fibersAddedAdd();
162 		waitingFibers[fiberData.threadNum].add(fiberData);//range violation??
163 	}
164 	//Only for debug and there it ma cause bugs
165 	void addThisFiberAndYield(FiberData thisFiber){
166 		addFiber(thisFiber);//We add running fiber and
167 		Fiber.yield();//wish that it wont be called before this yield
168 	}
169 
170 	void addJob(JobDelegate* del){
171 		debugHelper.jobsAddedAdd();
172 		waitingJobs.add(del);
173 	}
174 	void addJobs(JobDelegate*[] dels){
175 		debugHelper.jobsAddedAdd(cast(int)dels.length);
176 		waitingJobs.add(dels);
177 	}
178 	void addJobAndYield(JobDelegate* del){
179 		addJob(del);
180 		Fiber.yield();
181 	}
182 	void addJobsAndYield(JobDelegate*[] dels){
183 		addJobs(dels);
184 		Fiber.yield();
185 	}
186 
187 	
188 	CacheVector fibersCache;
189 	uint fibersMade;
190 
191 	Fiber allocateFiber(JobDelegate del){
192 		Fiber fiber;
193 		fiber=fibersCache.getData(jobManagerThreadNum,cast(uint)threadPool.length);
194 		assert(fiber.state==Fiber.State.TERM);
195 		fiber.reset(del);
196 		fibersMade++;
197 		return fiber;
198 	}
199 	void deallocateFiber(Fiber fiber){
200 		fibersCache.removeData(fiber,jobManagerThreadNum,cast(uint)threadPool.length);
201 	}
202 	void runNextJob(){
203 		static int dummySink;
204 		static int nothingToDoNum;
205 		
206 		Fiber fiber;
207 		FiberData fd=waitingFibers[jobManagerThreadNum].pop;
208 		if(fd!=FiberData.init){
209 			fiber=fd.fiber;
210 			debugHelper.fibersDoneAdd();
211 		}else if( !waitingJobs.empty ){
212 			JobDelegate* job;
213 			job=waitingJobs.pop();
214 			if(job !is null){
215 				debugHelper.jobsDoneAdd();
216 				fiber=allocateFiber(*job);
217 			}	
218 		}
219 		//nothing to do
220 		if(fiber is null ){
221 			nothingToDoNum++;
222 			if(nothingToDoNum>50){
223 				Thread.sleep(10.usecs);
224 			}else{
225 				foreach(i;0..uniform(0,20))dummySink+=uniform(1,2);//backoff
226 			}
227 			return;
228 		}
229 		//do the job
230 		nothingToDoNum=0;
231 		assert(fiber.state==Fiber.State.HOLD);
232 		fiber.call();
233 
234 		//reuse fiber
235 		if(fiber.state==Fiber.State.TERM){
236 			deallocateFiber(fiber);
237 		}
238 	}
239 
240 
241 	void threadRunFunction(){
242 		shared static int threadNumGenerator=-1;
243 		jobManagerThreadNum=atomicOp!"+="(threadNumGenerator,1);
244 		initializeDebugData();
245 		DebugSink.initialize();
246 		initializeFiberCache();
247 		while(!exit){
248 			runNextJob();
249 		}
250 		deinitializeDebugData();
251 		DebugSink.deinitialize();
252 	}
253 	
254 }
255 
256