1 module mutils.plugin.safe_executor; 2 3 import core.stdc.signal; 4 import core.sync.condition; 5 import core.sync.mutex; 6 import core.sync.semaphore; 7 import core.thread; 8 9 10 import std.experimental.allocator; 11 import std.experimental.allocator.mallocator; 12 import std.stdio; 13 import std.typecons; 14 15 import mutils.plugin.safe_executor; 16 import mutils.container_shared.shared_queue; 17 18 version(Posix){ 19 extern(C) void pthread_exit(void *value_ptr) nothrow @nogc @system; 20 void thisThreadExit() nothrow @nogc @system{ 21 pthread_exit(null); 22 } 23 }else version(Windows){ 24 extern(Windows) void ExitThread(uint dwExitCode) nothrow @nogc @system; 25 void thisThreadExit() nothrow @nogc @system{ 26 ExitThread(0); 27 } 28 }else{ 29 static assert(false, "Platform not supported"); 30 } 31 32 33 extern(C) void crashSignalHandle(int sig) nothrow @nogc @system{ 34 printf("Plugin crash. Signal number: %d.\n",sig); 35 thisThreadExit(); 36 } 37 38 struct SafeExecutor 39 { 40 alias PluginFunction=void function(); 41 alias Queue=LowLockQueue!(PluginExecHandle*); 42 43 struct PluginExecHandle{ 44 PluginFunction fn; 45 Condition condition; 46 Thread thread; 47 bool done=false; 48 } 49 50 enum pluginsThreadsNum=10; 51 bool exit=false; 52 Thread[pluginsThreadsNum] threads; 53 Queue queue; 54 55 Semaphore semaphore; 56 57 static void initializeCrashSignalCatching(){ 58 __gshared bool initialized=false; 59 if(initialized){ 60 return; 61 } 62 initialized=true; 63 //There is no signal function before android api 21 64 version(Android){}else{ 65 signal(SIGABRT,&crashSignalHandle); 66 signal(SIGSEGV,&crashSignalHandle); 67 } 68 } 69 70 void initialize(){ 71 initializeCrashSignalCatching(); 72 queue=Mallocator.instance.make!Queue(); 73 semaphore = Mallocator.instance.make!Semaphore(0); 74 foreach(ref th;threads){ 75 th=Mallocator.instance.make!Thread(&threadMain); 76 th.start(); 77 } 78 } 79 80 void end(){ 81 exit=true; 82 foreach(ref th;threads){ 83 th.join(); 84 Mallocator.instance.dispose(th); 85 } 86 Mallocator.instance.dispose(queue); 87 Mallocator.instance.dispose(semaphore); 88 } 89 90 void renewThread(Thread thread){ 91 foreach(ref th;threads){ 92 if(th==thread){ 93 if(!th.isRunning()){ 94 Mallocator.instance.dispose(th); 95 th=Mallocator.instance.make!Thread(&threadMain); 96 th.start(); 97 } 98 break; 99 } 100 } 101 } 102 103 bool execute(PluginFunction fn){ 104 auto mutexScoped=scoped!Mutex(); 105 Mutex mutex=mutexScoped; 106 auto condition=scoped!Condition(mutex); 107 PluginExecHandle handle=PluginExecHandle(fn,condition); 108 queue.add(&handle); 109 110 auto waitTime=1.msecs; 111 synchronized( mutex ){ 112 semaphore.notify(); 113 bool ok=handle.condition.wait(waitTime); 114 //wait until function ends or thread crashes 115 while(!ok){ 116 if(handle.thread !is null){ 117 if(handle.thread.isRunning()){ 118 if(handle.done){ 119 //Job done 120 return true; 121 }else{ 122 //Thread is doing job 123 ok=handle.condition.wait(waitTime); 124 } 125 }else{ 126 //Thread terminated (crashed) 127 renewThread(handle.thread); 128 ok=false; 129 break; 130 } 131 }else{ 132 //Handle is waiting to be cought by some thread 133 ok=handle.condition.wait(waitTime); 134 } 135 } 136 return ok; 137 } 138 } 139 140 void threadMain(){ 141 while(!exit){ 142 PluginExecHandle* handle=queue.pop; 143 if(handle !is null){ 144 handle.thread=Thread.getThis(); 145 handle.fn(); 146 handle.condition.notify(); 147 handle.done=true; 148 }else{ 149 semaphore.wait(100.msecs); 150 } 151 152 } 153 } 154 155 } 156