1 /**
2 Module implements singlethreated job manager with fibers (coroutines).
3 Rather for debugging than for anything else.
4  */
5 module mutils.job_manager.manager_singlethreated;
6 
7 import core.thread : Fiber;
8 
9 import std.functional : toDelegate;
10 import std.stdio : write,writeln,writefln;
11 
12 import mutils.job_manager.fiber_cache;
13 import mutils.job_manager.manager_utils;
14 import mutils.job_manager.shared_utils;
15 import mutils.job_manager.utils;
16 
17 //alias CacheVector=FiberNoCache;
18 //alias CacheVector=FiberOneCache;
19 //alias CacheVector=FiberVectorCache;
20 alias CacheVector=FiberTLSCache;
21 
22 
23 
24 __gshared JobManager jobManager=new JobManager;
25 
26 class JobManager{
27 	struct DebugHelper{
28 		uint jobsAdded;
29 		uint jobsDone;
30 		uint fibersAdded;
31 		uint fibersDone;
32 		
33 		void resetCounters(){
34 			jobsAdded=0;
35 			jobsDone=0;
36 			fibersAdded=0;
37 			fibersDone=0;			
38 		}
39 		void jobsAddedAdd  (int num=1){	 jobsAdded+=  num; }
40 		void jobsDoneAdd   (int num=1){	 jobsDone+=  num;}
41 		void fibersAddedAdd(int num=1){	 fibersAdded+=  num; }
42 		void fibersDoneAdd (int num=1){	 fibersDone+=  num; }		
43 	}
44 
45 	DebugHelper debugHelper;
46 	void initialize(uint threadsCount=0){
47 		fibersCache=Mallocator.instance.make!CacheVector();
48 	}
49 
50 	void start(){}	
51 	void waitForEnd(ref shared bool end){}
52 	void end(){}
53 
54 	void startMainLoop(void function() mainLoop,uint threadsCount=0){
55 		startMainLoop(mainLoop.toDelegate);
56 	}
57 
58 	void startMainLoop(JobDelegate mainLoop,uint threadsCount=0){
59 		initialize();
60 		addJob(&mainLoop);
61 		//mainLoop();
62 	}
63 
64 	
65 	void addFiber(FiberData fiberData){
66 		assert(fiberData.fiber.state!=Fiber.State.TERM && fiberData.fiber.state!=Fiber.State.EXEC);
67 		debugHelper.fibersAddedAdd();
68 		fiberData.fiber.call();
69 		debugHelper.fibersDoneAdd();
70 	}
71 
72 	//Only for debug 
73 	void addThisFiberAndYield(FiberData thisFiber){
74 		debugHelper.fibersAddedAdd();
75 		debugHelper.fibersDoneAdd();
76 	}
77 
78 	void addJob(JobDelegate* del){
79 		debugHelper.jobsAddedAdd();
80 		//printStack();
81 		Fiber fiber=allocateFiber(*del);
82 		assert(fiber.state!=Fiber.State.TERM);
83 		fiber.call();
84 		if(fiber.state==Fiber.State.TERM){
85 			deallocateFiber(fiber);
86 		}
87 		debugHelper.jobsDoneAdd();
88 	}
89 
90 	void addJobs(JobDelegate*[] dels){
91 		foreach(del;dels){
92 			addJob(del);
93 		}
94 	}
95 
96 	void addJobAndYield(JobDelegate* del){
97 		addJob(del);
98 		debugHelper.fibersAddedAdd();//Counter have started me
99 		debugHelper.fibersDoneAdd();
100 	}
101 
102 	void addJobsAndYield(JobDelegate*[] dels){
103 		addJobs(dels);
104 		debugHelper.fibersAddedAdd();//Counter have started me
105 		debugHelper.fibersDoneAdd();
106 	}
107 
108 	CacheVector fibersCache;
109 	uint fibersMade;
110 	Fiber allocateFiber(JobDelegate del){
111 		Fiber fiber;
112 		fiber=fibersCache.getData(0,1);
113 		assert(fiber.state==Fiber.State.TERM);
114 		fiber.reset(del);
115 		fibersMade++;
116 		return fiber;
117 	}
118 
119 	void deallocateFiber(Fiber fiber){
120 		fibersCache.removeData(fiber,0,1);
121 	}
122 	
123 }