1 module mutils.thread; 2 3 import core.atomic; 4 import core.stdc.stdio; 5 import std.experimental.allocator; 6 import std.experimental.allocator.mallocator; 7 8 import mutils.bindings.libcoro; 9 import mutils.container.vector; 10 import mutils.job_manager.manager_utils; 11 12 version (Posix) { 13 import core.sys.posix.pthread; 14 import core.sys.posix.semaphore; 15 16 import core.sys.posix.sched : sched_yield; 17 } else version (Windows) { 18 import mutils.bindings.pthreads_windows; 19 } else { 20 static assert(0); 21 } 22 23 void msleep(int msecs) { 24 version (Posix) { 25 import core.sys.posix.unistd; 26 27 usleep(msecs * 1000); 28 } else version (Windows) { 29 import core.sys.windows.windows; 30 31 Sleep(msecs); 32 } else { 33 static assert(0); 34 } 35 } 36 37 import core.atomic; 38 39 struct Semaphore { 40 sem_t mutex; 41 42 void initialize() { 43 sem_init(&mutex, 0, 0); 44 } 45 46 void wait() { 47 int ret = sem_wait(&mutex); 48 assert(ret == 0); 49 } 50 51 bool tryWait() { 52 //return true; 53 int ret = sem_trywait(&mutex); 54 return (ret == 0); 55 } 56 57 bool timedWait(int usecs) { 58 timespec tv; 59 clock_gettime(CLOCK_REALTIME, &tv); 60 tv.tv_sec+=usecs/1_000_000; 61 tv.tv_nsec += (usecs%1_000_000)*1_000; 62 63 int ret = sem_timedwait(&mutex, &tv); 64 return (ret == 0); 65 } 66 67 void post() { 68 int ret = sem_post(&mutex); 69 assert(ret == 0); 70 } 71 72 void destroy() { 73 sem_destroy(&mutex); 74 } 75 } 76 77 struct Mutex { 78 pthread_mutex_t mutex; 79 80 void initialzie() { 81 pthread_mutex_init(&mutex, null); 82 } 83 84 void lock() { 85 pthread_mutex_lock(&mutex); 86 } 87 88 bool tryLock() { 89 int ret = pthread_mutex_trylock(&mutex); 90 return ret == 0; 91 } 92 93 void unlock() { 94 pthread_mutex_unlock(&mutex); 95 } 96 } 97 98 void instructionPause() { 99 version (X86_64) { 100 version (LDC) { 101 import ldc.gccbuiltins_x86 : __builtin_ia32_pause; 102 103 __builtin_ia32_pause(); 104 } else version (DigitalMars) { 105 asm { 106 rep; 107 nop; 108 } 109 110 } else { 111 static assert(0); 112 } 113 } 114 } 115 116 struct MutexSpinLock { 117 align(64) shared size_t lockVar; 118 119 void initialzie() { 120 } 121 122 void lock() { 123 if (cas(&lockVar, size_t(0), size_t(1))) { 124 return; 125 } 126 while (true) { 127 for (size_t n; atomicLoad!(MemoryOrder.raw)(lockVar); n++) { 128 if (n < 8) { 129 instructionPause(); 130 } else { 131 sched_yield(); 132 } 133 } 134 if (cas(&lockVar, size_t(0), size_t(1))) { 135 return; 136 } 137 } 138 139 } 140 141 bool tryLock() { 142 return cas(&lockVar, size_t(0), size_t(1)); 143 } 144 145 void unlock() { 146 atomicStore!(MemoryOrder.rel)(lockVar, size_t(0)); 147 } 148 } 149 150 extern (C) void rt_moduleTlsCtor(); 151 extern (C) void rt_moduleTlsDtor(); 152 153 extern (C) void* threadRunFunction(void* threadVoid) { 154 import core.thread : thread_attachThis, thread_detachThis; 155 156 Thread* th = cast(Thread*) threadVoid; 157 158 //auto stdThread=thread_attachThis(); 159 rt_moduleTlsCtor(); 160 161 Thread.thisThreadd = th; 162 th.threadStart(); 163 164 //thread_detachThis(); 165 rt_moduleTlsDtor(); 166 167 th.reset(); 168 pthread_exit(null); 169 return null; 170 } 171 172 struct Thread { 173 @disable this(this); 174 alias DG = void delegate(); 175 DG threadStart; 176 pthread_t handle; 177 uint threadNum = uint.max; 178 static Thread* thisThreadd; 179 180 void setDg(DG dg) { 181 threadStart = dg; 182 } 183 184 void start() { 185 int ok = pthread_create(&handle, null, &threadRunFunction, cast(void*)&this); 186 assert(ok == 0); 187 } 188 189 bool isRunning() { 190 return true; 191 } 192 193 void reset() { 194 threadStart = null; 195 threadNum = uint.max; 196 } 197 198 void join() { 199 pthread_join(handle, null); 200 handle = handle.init; 201 reset(); 202 } 203 204 static void sleep(int msecs) { 205 msleep(msecs); 206 } 207 208 static Thread* getThis() { 209 assert(thisThreadd !is null); 210 return thisThreadd; 211 } 212 213 static uint getThisThreadNum() { 214 Thread* th = Thread.getThis(); 215 auto thNum = th.threadNum; 216 return thNum; 217 } 218 219 } 220 221 enum PAGESIZE = 4096 * 4; 222 223 extern (C) void fiberRunFunction(void* threadVoid) { 224 Fiber th = cast(Fiber) threadVoid; 225 while (1) { 226 //printf("-----\n"); 227 if (th == Fiber.gRootFiber) { 228 printf("root\n"); 229 230 } 231 if (th.myThreadNum != jobManagerThreadNum) { 232 printf("myThreadNum th %d %d\n", th.myThreadNum, jobManagerThreadNum); 233 printf("myTh th %p %p\n", th.myThread, Thread.getThis); 234 printf("NNNNNUUUUUU %p\n", th.threadStart); 235 } 236 if (th.threadStart == null) { 237 printf("NNNNNUUUUUULLLLLlll %d %p\n", jobManagerThreadNum, th); 238 printf("NNNNNUUUUUU %p\n", th.threadStart); 239 } 240 assert(th.myThreadNum == jobManagerThreadNum); 241 assert(th.myThread == Thread.getThis); 242 th.threadStart(); 243 th.threadStart = null; 244 fiber_transfer(th, th.lastFiber, Fiber.State.TERM, true); 245 } 246 assert(0); 247 } 248 249 void fiber_transfer(Fiber from, Fiber to, 250 Fiber.State fiberfromStateAfterTransfer, bool backFromFiber) { 251 //printf("switch th(%d) from %p(%d) to %p, global: %p\n",jobManagerThreadNum, from, from.state, to, cast(void*)gRootFiber); 252 if (to != Fiber.gRootFiber && to.state != Fiber.State.HOLD) { 253 printf("fiber transfer statis: of to: %d\n", to.state); 254 } 255 assert(from !is null); 256 assert(to !is null); 257 assert(to.state == Fiber.State.HOLD); 258 assert(from.state == Fiber.State.EXEC); //Root is special may have any state 259 assert(to != Fiber.gRootFiber || backFromFiber); 260 assert(to == Fiber.gRootFiber || to.threadStart !is null); // Root may not have startFunc 261 262 from.state = fiberfromStateAfterTransfer; 263 if (!backFromFiber) 264 to.lastFiber = Fiber.gCurrentFiber; 265 if (backFromFiber) 266 from.lastFiber = null; 267 to.state = Fiber.State.EXEC; 268 Fiber.gCurrentFiber = to; 269 270 coro_transfer(&from.context, &to.context); 271 Fiber.gCurrentFiber = from; 272 from.state = Fiber.State.EXEC; 273 //to.state=fiberToStateAfterExit;// to state is set by fibercalling to this fiber 274 275 } 276 277 final class Fiber { 278 static Fiber gCurrentFiber; 279 static Fiber gRootFiber; 280 //@disable this(this); 281 alias DG = void delegate(); 282 283 enum State : ubyte { 284 HOLD = 0, 285 TERM = 1, 286 EXEC = 2, 287 } 288 289 align(128) coro_context context; 290 DG threadStart; 291 Fiber lastFiber; 292 Thread* myThread; 293 uint myThreadNum = int.max; 294 size_t pageSize = PAGESIZE * 32u; 295 296 bool created = false; 297 State state; 298 299 this() { 300 myThreadNum = jobManagerThreadNum; 301 myThread = Thread.getThis(); 302 } 303 304 this(size_t pageSize) { 305 myThreadNum = jobManagerThreadNum; 306 this.pageSize = pageSize; 307 myThread = Thread.getThis(); 308 } 309 310 this(DG dg, size_t pageSize) { 311 myThreadNum = jobManagerThreadNum; 312 threadStart = dg; 313 this.pageSize = pageSize; 314 myThread = Thread.getThis(); 315 } 316 317 ~this() { 318 lastFiber = null; 319 state = cast(State) 0xffff; 320 pageSize = 3; 321 threadStart = null; 322 //coro_destroy(&context); 323 } 324 325 void reset(DG dg) { 326 assert(state == State.TERM); 327 state = State.HOLD; 328 threadStart = dg; 329 } 330 331 __gshared static MutexSpinLock mutex; 332 333 static void initializeStatic() { 334 //printf("Fiber.initializeStatic\n"); 335 gRootFiber = Mallocator.instance.make!(Fiber)(); 336 assert(gRootFiber.state == State.HOLD); 337 mutex.lock(); 338 coro_create(&gRootFiber.context, null, null, null, 0); 339 mutex.unlock(); 340 gRootFiber.state = State.EXEC; 341 342 } 343 344 void call() { 345 346 if (gCurrentFiber is null) { 347 gCurrentFiber = gRootFiber; 348 } 349 350 if (created == false) { 351 created = true; 352 353 coro_stack stack; 354 355 import core.stdc.stdlib : malloc; 356 357 void* mem = malloc(pageSize); 358 stack.sptr = mem; 359 stack.ssze = pageSize; 360 //int ok=coro_stack_alloc(&stack, 1024*1024); 361 //assert(ok); 362 //printf("stack(%p), size %d\n", stack.sptr ,stack.ssze); 363 //printf("create(%d) corr(%p)\n", jobManagerThreadNum ,this); 364 assert(myThreadNum == jobManagerThreadNum); 365 366 mutex.lock(); 367 coro_create(&context, &fiberRunFunction, cast(void*) this, stack.sptr, stack.ssze); 368 mutex.unlock(); 369 } 370 assert(jobManagerThreadNum == myThreadNum); 371 372 auto fib = gCurrentFiber; 373 assert(this.threadStart !is null); 374 fiber_transfer(fib, this, Fiber.State.HOLD, false); 375 376 } 377 378 static void yield() { 379 //printf("yield %d\n", jobManagerThreadNum); 380 auto fib = gCurrentFiber; 381 assert(fib != gRootFiber); 382 fiber_transfer(fib, fib.lastFiber, Fiber.State.HOLD, true); 383 } 384 385 static Fiber getThis() { 386 if (gCurrentFiber is null || gCurrentFiber.state != Fiber.State.EXEC) { 387 return null; 388 } 389 assert(gCurrentFiber != gRootFiber); 390 return gCurrentFiber; 391 } 392 393 } 394 395 unittest { 396 Fiber fb; 397 enum nestageLevelMax = 10; 398 int nestageLevel = 0; 399 400 void testNest() { 401 if (nestageLevel >= nestageLevelMax) { 402 return; 403 } 404 nestageLevel++; 405 Fiber f = Mallocator.instance.make!Fiber(&testNest, PAGESIZE * 32u); 406 scope (exit) 407 Mallocator.instance.dispose(f); 408 f.call(); 409 } 410 411 void testFunc() { 412 assert(Fiber.getThis().lastFiber != Fiber.getThis()); 413 assert(Fiber.getThis().lastFiber == fb); 414 Fiber.yield(); 415 assert(Fiber.getThis().lastFiber != Fiber.getThis()); 416 assert(Fiber.getThis().lastFiber == fb); 417 Fiber.yield(); 418 assert(Fiber.getThis().lastFiber != Fiber.getThis()); 419 assert(Fiber.getThis().lastFiber == fb); 420 421 Fiber f = Mallocator.instance.make!Fiber(&testNest, PAGESIZE * 32u); 422 scope (exit) 423 Mallocator.instance.dispose(f); 424 f.call(); 425 } 426 427 void mainFiber() { 428 assert(Fiber.getThis() == fb); 429 Fiber f = Mallocator.instance.make!Fiber(&testFunc, PAGESIZE * 32u); 430 scope (exit) 431 Mallocator.instance.dispose(f); 432 433 assert(f.state == Fiber.State.HOLD); 434 f.call(); 435 assert(f.state == Fiber.State.HOLD); 436 f.call(); 437 assert(f.state == Fiber.State.HOLD); 438 f.call(); 439 assert(f.state == Fiber.State.TERM); 440 assert(Fiber.getThis() == fb); 441 } 442 443 void threadStart() { 444 Fiber.initializeStatic(); 445 fb = Mallocator.instance.make!Fiber(&mainFiber, PAGESIZE * 32u); 446 scope (exit) 447 Mallocator.instance.dispose(fb); 448 449 assert(fb.state == Fiber.State.HOLD); 450 fb.call(); 451 assert(fb.state == Fiber.State.TERM); 452 assert(nestageLevel == nestageLevelMax); 453 } 454 455 Thread th; 456 th.threadNum = 5674; 457 th.setDg(&threadStart); 458 th.start(); 459 th.join(); 460 }