1 module mutils.thread;
2 
3 import core.atomic;
4 import core.stdc.stdio;
5 import std.experimental.allocator;
6 import std.experimental.allocator.mallocator;
7 
8 import mutils.bindings.libcoro;
9 import mutils.container.vector;
10 import mutils.job_manager.manager_utils;
11 
12 version (Posix) {
13 	import core.sys.posix.pthread;
14 	import core.sys.posix.semaphore;
15 
16 	import core.sys.posix.sched : sched_yield;
17 } else version (Windows) {
18 	import mutils.bindings.pthreads_windows;
19 } else {
20 	static assert(0);
21 }
22 
23 void msleep(int msecs) {
24 	version (Posix) {
25 		import core.sys.posix.unistd;
26 
27 		usleep(msecs * 1000);
28 	} else version (Windows) {
29 		import core.sys.windows.windows;
30 
31 		Sleep(msecs);
32 	} else {
33 		static assert(0);
34 	}
35 }
36 
37 import core.atomic;
38 
39 struct Semaphore {
40 	sem_t mutex;
41 
42 	void initialize() {
43 		sem_init(&mutex, 0, 0);
44 	}
45 
46 	void wait() {
47 		int ret = sem_wait(&mutex);
48 		assert(ret == 0);
49 	}
50 
51 	bool tryWait() {
52 		//return true;
53 		int ret = sem_trywait(&mutex);
54 		return (ret == 0);
55 	}
56 
57 	bool timedWait(int usecs) {
58 		timespec tv;
59 		clock_gettime(CLOCK_REALTIME, &tv);
60 		tv.tv_sec+=usecs/1_000_000;
61 		tv.tv_nsec += (usecs%1_000_000)*1_000;
62 
63 		int ret = sem_timedwait(&mutex, &tv);
64 		return (ret == 0);
65 	}
66 
67 	void post() {
68 		int ret = sem_post(&mutex);
69 		assert(ret == 0);
70 	}
71 
72 	void destroy() {
73 		sem_destroy(&mutex);
74 	}
75 }
76 
77 struct Mutex {
78 	pthread_mutex_t mutex;
79 
80 	void initialzie() {
81 		pthread_mutex_init(&mutex, null);
82 	}
83 
84 	void lock() {
85 		pthread_mutex_lock(&mutex);
86 	}
87 
88 	bool tryLock() {
89 		int ret = pthread_mutex_trylock(&mutex);
90 		return ret == 0;
91 	}
92 
93 	void unlock() {
94 		pthread_mutex_unlock(&mutex);
95 	}
96 }
97 
98 void instructionPause() {
99 	version (X86_64) {
100 		version (LDC) {
101 			import ldc.gccbuiltins_x86 : __builtin_ia32_pause;
102 
103 			__builtin_ia32_pause();
104 		} else version (DigitalMars) {
105 			asm {
106 				rep;
107 				nop;
108 			}
109 
110 		} else {
111 			static assert(0);
112 		}
113 	}
114 }
115 
116 struct MutexSpinLock {
117 	align(64) shared size_t lockVar;
118 
119 	void initialzie() {
120 	}
121 
122 	void lock() {
123 		if (cas(&lockVar, size_t(0), size_t(1))) {
124 			return;
125 		}
126 		while (true) {
127 			for (size_t n; atomicLoad!(MemoryOrder.raw)(lockVar); n++) {
128 				if (n < 8) {
129 					instructionPause();
130 				} else {
131 					sched_yield();
132 				}
133 			}
134 			if (cas(&lockVar, size_t(0), size_t(1))) {
135 				return;
136 			}
137 		}
138 
139 	}
140 
141 	bool tryLock() {
142 		return cas(&lockVar, size_t(0), size_t(1));
143 	}
144 
145 	void unlock() {
146 		atomicStore!(MemoryOrder.rel)(lockVar, size_t(0));
147 	}
148 }
149 
150 extern (C) void rt_moduleTlsCtor();
151 extern (C) void rt_moduleTlsDtor();
152 
153 extern (C) void* threadRunFunction(void* threadVoid) {
154 	import core.thread : thread_attachThis, thread_detachThis;
155 
156 	Thread* th = cast(Thread*) threadVoid;
157 
158 	//auto stdThread=thread_attachThis();
159 	rt_moduleTlsCtor();
160 
161 	Thread.thisThreadd = th;
162 	th.threadStart();
163 
164 	//thread_detachThis();
165 	rt_moduleTlsDtor();
166 
167 	th.reset();
168 	pthread_exit(null);
169 	return null;
170 }
171 
172 struct Thread {
173 	@disable this(this);
174 	alias DG = void delegate();
175 	DG threadStart;
176 	pthread_t handle;
177 	uint threadNum = uint.max;
178 	static Thread* thisThreadd;
179 
180 	void setDg(DG dg) {
181 		threadStart = dg;
182 	}
183 
184 	void start() {
185 		int ok = pthread_create(&handle, null, &threadRunFunction, cast(void*)&this);
186 		assert(ok == 0);
187 	}
188 
189 	bool isRunning() {
190 		return true;
191 	}
192 
193 	void reset() {
194 		threadStart = null;
195 		threadNum = uint.max;
196 	}
197 
198 	void join() {
199 		pthread_join(handle, null);
200 		handle = handle.init;
201 		reset();
202 	}
203 
204 	static void sleep(int msecs) {
205 		msleep(msecs);
206 	}
207 
208 	static Thread* getThis() {
209 		assert(thisThreadd !is null);
210 		return thisThreadd;
211 	}
212 
213 	static uint getThisThreadNum() {
214 		Thread* th = Thread.getThis();
215 		auto thNum = th.threadNum;
216 		return thNum;
217 	}
218 
219 }
220 
221 enum PAGESIZE = 4096 * 4;
222 
223 extern (C) void fiberRunFunction(void* threadVoid) {
224 	Fiber th = cast(Fiber) threadVoid;
225 	while (1) {
226 		//printf("-----\n");
227 		if (th == Fiber.gRootFiber) {
228 			printf("root\n");
229 
230 		}
231 		if (th.myThreadNum != jobManagerThreadNum) {
232 			printf("myThreadNum th %d %d\n", th.myThreadNum, jobManagerThreadNum);
233 			printf("myTh th %p %p\n", th.myThread, Thread.getThis);
234 			printf("NNNNNUUUUUU %p\n", th.threadStart);
235 		}
236 		if (th.threadStart == null) {
237 			printf("NNNNNUUUUUULLLLLlll %d %p\n", jobManagerThreadNum, th);
238 			printf("NNNNNUUUUUU %p\n", th.threadStart);
239 		}
240 		assert(th.myThreadNum == jobManagerThreadNum);
241 		assert(th.myThread == Thread.getThis);
242 		th.threadStart();
243 		th.threadStart = null;
244 		fiber_transfer(th, th.lastFiber, Fiber.State.TERM, true);
245 	}
246 	assert(0);
247 }
248 
249 void fiber_transfer(Fiber from, Fiber to,
250 		Fiber.State fiberfromStateAfterTransfer, bool backFromFiber) {
251 	//printf("switch th(%d) from %p(%d) to %p,      global: %p\n",jobManagerThreadNum, from, from.state, to, cast(void*)gRootFiber);
252 	if (to != Fiber.gRootFiber && to.state != Fiber.State.HOLD) {
253 		printf("fiber transfer statis: of to: %d\n", to.state);
254 	}
255 	assert(from !is null);
256 	assert(to !is null);
257 	assert(to.state == Fiber.State.HOLD);
258 	assert(from.state == Fiber.State.EXEC); //Root is special may have any state
259 	assert(to != Fiber.gRootFiber || backFromFiber);
260 	assert(to == Fiber.gRootFiber || to.threadStart !is null); // Root may not have startFunc
261 
262 	from.state = fiberfromStateAfterTransfer;
263 	if (!backFromFiber)
264 		to.lastFiber = Fiber.gCurrentFiber;
265 	if (backFromFiber)
266 		from.lastFiber = null;
267 	to.state = Fiber.State.EXEC;
268 	Fiber.gCurrentFiber = to;
269 
270 	coro_transfer(&from.context, &to.context);
271 	Fiber.gCurrentFiber = from;
272 	from.state = Fiber.State.EXEC;
273 	//to.state=fiberToStateAfterExit;// to state is set by fibercalling to this fiber
274 
275 }
276 
277 final class Fiber {
278 	static Fiber gCurrentFiber;
279 	static Fiber gRootFiber;
280 	//@disable this(this);
281 	alias DG = void delegate();
282 
283 	enum State : ubyte {
284 		HOLD = 0,
285 		TERM = 1,
286 		EXEC = 2,
287 	}
288 
289 	align(128) coro_context context;
290 	DG threadStart;
291 	Fiber lastFiber;
292 	Thread* myThread;
293 	uint myThreadNum = int.max;
294 	size_t pageSize = PAGESIZE * 32u;
295 
296 	bool created = false;
297 	State state;
298 
299 	this() {
300 		myThreadNum = jobManagerThreadNum;
301 		myThread = Thread.getThis();
302 	}
303 
304 	this(size_t pageSize) {
305 		myThreadNum = jobManagerThreadNum;
306 		this.pageSize = pageSize;
307 		myThread = Thread.getThis();
308 	}
309 
310 	this(DG dg, size_t pageSize) {
311 		myThreadNum = jobManagerThreadNum;
312 		threadStart = dg;
313 		this.pageSize = pageSize;
314 		myThread = Thread.getThis();
315 	}
316 
317 	~this() {
318 		lastFiber = null;
319 		state = cast(State) 0xffff;
320 		pageSize = 3;
321 		threadStart = null;
322 		//coro_destroy(&context);
323 	}
324 
325 	void reset(DG dg) {
326 		assert(state == State.TERM);
327 		state = State.HOLD;
328 		threadStart = dg;
329 	}
330 
331 	__gshared static MutexSpinLock mutex;
332 
333 	static void initializeStatic() {
334 		//printf("Fiber.initializeStatic\n");
335 		gRootFiber = Mallocator.instance.make!(Fiber)();
336 		assert(gRootFiber.state == State.HOLD);
337 		mutex.lock();
338 		coro_create(&gRootFiber.context, null, null, null, 0);
339 		mutex.unlock();
340 		gRootFiber.state = State.EXEC;
341 
342 	}
343 
344 	void call() {
345 
346 		if (gCurrentFiber is null) {
347 			gCurrentFiber = gRootFiber;
348 		}
349 
350 		if (created == false) {
351 			created = true;
352 
353 			coro_stack stack;
354 
355 			import core.stdc.stdlib : malloc;
356 
357 			void* mem = malloc(pageSize);
358 			stack.sptr = mem;
359 			stack.ssze = pageSize;
360 			//int ok=coro_stack_alloc(&stack, 1024*1024);
361 			//assert(ok);
362 			//printf("stack(%p), size %d\n", stack.sptr ,stack.ssze);
363 			//printf("create(%d) corr(%p)\n", jobManagerThreadNum ,this);
364 			assert(myThreadNum == jobManagerThreadNum);
365 
366 			mutex.lock();
367 			coro_create(&context, &fiberRunFunction, cast(void*) this, stack.sptr, stack.ssze);
368 			mutex.unlock();
369 		}
370 		assert(jobManagerThreadNum == myThreadNum);
371 
372 		auto fib = gCurrentFiber;
373 		assert(this.threadStart !is null);
374 		fiber_transfer(fib, this, Fiber.State.HOLD, false);
375 
376 	}
377 
378 	static void yield() {
379 		//printf("yield %d\n", jobManagerThreadNum);		
380 		auto fib = gCurrentFiber;
381 		assert(fib != gRootFiber);
382 		fiber_transfer(fib, fib.lastFiber, Fiber.State.HOLD, true);
383 	}
384 
385 	static Fiber getThis() {
386 		if (gCurrentFiber is null || gCurrentFiber.state != Fiber.State.EXEC) {
387 			return null;
388 		}
389 		assert(gCurrentFiber != gRootFiber);
390 		return gCurrentFiber;
391 	}
392 
393 }
394 
395 unittest {
396 	Fiber fb;
397 	enum nestageLevelMax = 10;
398 	int nestageLevel = 0;
399 
400 	void testNest() {
401 		if (nestageLevel >= nestageLevelMax) {
402 			return;
403 		}
404 		nestageLevel++;
405 		Fiber f = Mallocator.instance.make!Fiber(&testNest, PAGESIZE * 32u);
406 		scope (exit)
407 			Mallocator.instance.dispose(f);
408 		f.call();
409 	}
410 
411 	void testFunc() {
412 		assert(Fiber.getThis().lastFiber != Fiber.getThis());
413 		assert(Fiber.getThis().lastFiber == fb);
414 		Fiber.yield();
415 		assert(Fiber.getThis().lastFiber != Fiber.getThis());
416 		assert(Fiber.getThis().lastFiber == fb);
417 		Fiber.yield();
418 		assert(Fiber.getThis().lastFiber != Fiber.getThis());
419 		assert(Fiber.getThis().lastFiber == fb);
420 
421 		Fiber f = Mallocator.instance.make!Fiber(&testNest, PAGESIZE * 32u);
422 		scope (exit)
423 			Mallocator.instance.dispose(f);
424 		f.call();
425 	}
426 
427 	void mainFiber() {
428 		assert(Fiber.getThis() == fb);
429 		Fiber f = Mallocator.instance.make!Fiber(&testFunc, PAGESIZE * 32u);
430 		scope (exit)
431 			Mallocator.instance.dispose(f);
432 
433 		assert(f.state == Fiber.State.HOLD);
434 		f.call();
435 		assert(f.state == Fiber.State.HOLD);
436 		f.call();
437 		assert(f.state == Fiber.State.HOLD);
438 		f.call();
439 		assert(f.state == Fiber.State.TERM);
440 		assert(Fiber.getThis() == fb);
441 	}
442 
443 	void threadStart() {
444 		Fiber.initializeStatic();
445 		fb = Mallocator.instance.make!Fiber(&mainFiber, PAGESIZE * 32u);
446 		scope (exit)
447 			Mallocator.instance.dispose(fb);
448 
449 		assert(fb.state == Fiber.State.HOLD);
450 		fb.call();
451 		assert(fb.state == Fiber.State.TERM);
452 		assert(nestageLevel == nestageLevelMax);
453 	}
454 
455 	Thread th;
456 	th.threadNum = 5674;
457 	th.setDg(&threadStart);
458 	th.start();
459 	th.join();
460 }