1 module mutils.plugin.safe_executor;
2 
3 import core.stdc.signal;
4 import core.sync.condition;
5 import core.sync.mutex;
6 import core.sync.semaphore;
7 import core.thread;
8 
9 
10 import std.experimental.allocator;
11 import std.experimental.allocator.mallocator;
12 import std.stdio;
13 import std.typecons;
14 
15 import mutils.plugin.safe_executor;
16 import mutils.container_shared.shared_queue;
17 
18 version(Posix){
19 	extern(C) void pthread_exit(void *value_ptr)  nothrow @nogc @system;
20 	void thisThreadExit() nothrow @nogc @system{
21 		pthread_exit(null);
22 	}
23 }else version(Windows){
24 	extern(Windows) void ExitThread(uint dwExitCode)  nothrow @nogc @system;
25 	void thisThreadExit() nothrow @nogc @system{
26 		ExitThread(0);
27 	}
28 }else{
29 	static assert(false, "Platform not supported");
30 }
31 
32 
33 extern(C) void crashSignalHandle(int sig) nothrow @nogc @system{	
34 	printf("Plugin crash. Signal number: %d.\n",sig);
35 	thisThreadExit();
36 }
37 
38 struct SafeExecutor
39 {
40 	alias PluginFunction=void function();
41 	alias Queue=LowLockQueue!(PluginExecHandle*);
42 
43 	struct PluginExecHandle{
44 		PluginFunction fn;
45 		Condition condition;
46 		Thread thread;
47 		bool done=false;
48 	}
49 
50 	enum pluginsThreadsNum=10;
51 	bool exit=false;
52 	Thread[pluginsThreadsNum] threads;
53 	Queue queue;
54 
55 	Semaphore semaphore;
56 
57 	static void initializeCrashSignalCatching(){
58 		__gshared bool initialized=false;
59 		if(initialized){
60 			return;
61 		}
62 		initialized=true;
63 		//There is no signal function before android api 21
64 		version(Android){}else{
65 			signal(SIGABRT,&crashSignalHandle);
66 			signal(SIGSEGV,&crashSignalHandle);
67 		}
68 	}
69 
70 	void initialize(){
71 		initializeCrashSignalCatching();
72 		queue=Mallocator.instance.make!Queue();
73 		semaphore = Mallocator.instance.make!Semaphore(0);
74 		foreach(ref th;threads){
75 			th=Mallocator.instance.make!Thread(&threadMain);
76 			th.start();
77 		}
78 	}
79 
80 	void end(){
81 		exit=true;
82 		foreach(ref th;threads){
83 			th.join();
84 			Mallocator.instance.dispose(th);
85 		}
86 		Mallocator.instance.dispose(queue);
87 		Mallocator.instance.dispose(semaphore);
88 	}
89 
90 	void renewThread(Thread thread){
91 		foreach(ref th;threads){
92 			if(th==thread){
93 				if(!th.isRunning()){
94 					Mallocator.instance.dispose(th);
95 					th=Mallocator.instance.make!Thread(&threadMain);
96 					th.start();
97 				}
98 				break;
99 			}
100 		}
101 	}
102 	
103 	bool execute(PluginFunction fn){
104 		auto mutexScoped=scoped!Mutex();
105 		Mutex mutex=mutexScoped;
106 		auto condition=scoped!Condition(mutex);
107 		PluginExecHandle handle=PluginExecHandle(fn,condition);
108 		queue.add(&handle);
109 
110 		auto waitTime=1.msecs;
111 		synchronized( mutex ){
112 			semaphore.notify();
113 			bool ok=handle.condition.wait(waitTime);
114 			//wait until function ends or thread crashes
115 			while(!ok){
116 				if(handle.thread !is null){
117 					if(handle.thread.isRunning()){
118 						if(handle.done){
119 							//Job done
120 							return true;
121 						}else{
122 							//Thread is doing job
123 							ok=handle.condition.wait(waitTime);
124 						}
125 					}else{
126 						//Thread terminated (crashed)
127 						renewThread(handle.thread);
128 						ok=false;
129 						break;
130 					}
131 				}else{
132 					//Handle is waiting to be cought by some thread
133 					ok=handle.condition.wait(waitTime);
134 				}
135 			}
136 			return ok;
137 		}
138 	}
139 	
140 	void threadMain(){
141 		while(!exit){
142 			PluginExecHandle* handle=queue.pop;
143 			if(handle !is null){
144 				handle.thread=Thread.getThis();
145 				handle.fn();
146 				handle.condition.notify();
147 				handle.done=true;
148 			}else{
149 				semaphore.wait(100.msecs);
150 			}
151 
152 		}
153 	}
154 	
155 }
156