code / tomo-pthreads

Lines125 Tomo116 Markdown9
(141 lines)
1 # A Posix Threads (pthreads) wrapper
2 use <pthread.h>
4 struct Mutex(_mutex:@Memory)
5 func new(->Mutex)
6 return Mutex(
7 C_code : @Memory`
8 pthread_mutex_t *mutex = GC_MALLOC(sizeof(pthread_mutex_t));
9 pthread_mutex_init(mutex, NULL);
10 GC_register_finalizer(mutex, (void*)pthread_mutex_destroy, NULL, NULL, NULL);
11 mutex
15 func lock(m:Mutex)
16 status := C_code:Int32`pthread_mutex_lock(@(m._mutex))`
17 fail("Failed to lock Mutex") unless status == 0
19 func unlock(m:Mutex)
20 status := C_code:Int32`pthread_mutex_unlock(@(m._mutex))`
21 fail("Failed to unlock Mutex") unless status == 0
23 struct Condition(_cond:@Memory)
24 func new(->Condition)
25 return Condition(
26 C_code : @Memory `
27 pthread_cond_t *cond = GC_MALLOC(sizeof(pthread_cond_t));
28 pthread_cond_init(cond, NULL);
29 GC_register_finalizer(cond, (void*)pthread_cond_destroy, NULL, NULL, NULL);
30 cond
34 func wait(cond:Condition, mutex:Mutex)
35 status := C_code:Int32`pthread_cond_wait(@(cond._cond), @(mutex._mutex))`
36 fail("Failed to wait on Condition") unless status == 0
38 func signal(cond:Condition)
39 status := C_code:Int32`pthread_cond_signal(@(cond._cond))`
40 fail("Failed to signal Condition") unless status == 0
42 func broadcast(cond:Condition)
43 status := C_code:Int32`pthread_cond_broadcast(@(cond._cond))`
44 fail("Failed to broadcast Condition") unless status == 0
46 struct RWLock(_rwlock:@Memory)
47 func new(->RWLock)
48 return RWLock(
49 C_code : @Memory `
50 pthread_rwlock_t *lock = GC_MALLOC(sizeof(pthread_rwlock_t));
51 pthread_rwlock_init(lock, NULL);
52 GC_register_finalizer(lock, (void*)pthread_rwlock_destroy, NULL, NULL, NULL);
53 lock
57 func read_lock(lock:RWLock)
58 status := C_code:Int32 `pthread_rwlock_rdlock(@(lock._rwlock))`
59 fail("Failed to read-lock RWLock") unless status == 0
61 func write_lock(lock:RWLock)
62 status := C_code:Int32 `pthread_rwlock_wrlock(@(lock._rwlock))`
63 fail("Failed to write-lock RWLock") unless status == 0
65 func unlock(lock:RWLock)
66 status := C_code:Int32 `pthread_rwlock_unlock(@(lock._rwlock))`
67 fail("Failed to unlock RWLock") unless status == 0
69 struct PThread(_pthread:@Memory)
70 func new(fn:func() -> PThread)
71 return PThread(
72 C_code:@Memory `
73 pthread_t *thread = GC_MALLOC(sizeof(pthread_t));
74 pthread_create(thread, NULL, @fn.fn, @fn.userdata);
75 thread
79 func join(p:PThread)
80 status := C_code:Int32 `pthread_join(*(pthread_t*)@(p._pthread), NULL)`
81 fail("Failed to cancel PThread") if status != 0
83 func cancel(p:PThread)
84 status := C_code:Int32 `pthread_cancel(*(pthread_t*)@(p._pthread))`
85 fail("Failed to cancel PThread") if status != 0
87 func detatch(p:PThread)
88 status := C_code:Int32 `pthread_detach(*(pthread_t*)@(p._pthread))`
89 fail("Failed to detatch PThread") if status != 0
91 struct IntQueue(_queue:@[Int], _mutex:Mutex, _cond:Condition)
92 func new(initial:[Int]=[] -> IntQueue)
93 return IntQueue(@initial, Mutex.new(), Condition.new())
95 func give(q:IntQueue, n:Int)
96 do q._mutex.lock()
97 q._queue.insert(n)
98 q._mutex.unlock()
99 q._cond.signal()
101 func take(q:IntQueue -> Int)
102 do q._mutex.lock()
103 n := q._queue.pop(1)
104 while not n
105 q._cond.wait(q._mutex)
106 n = q._queue.pop(1)
107 q._mutex.unlock()
108 return n!
110 func main()
111 jobs := IntQueue.new()
112 results := IntQueue.new()
114 say_mutex := Mutex.new()
115 announce := func(speaker:Text, text:Text)
116 do say_mutex.lock()
117 say("\[2][$speaker]\[] $text")
118 say_mutex.unlock()
120 worker := PThread.new(func()
121 say("I'm in the thread!")
122 repeat
123 announce("worker", "waiting for job")
124 job := jobs.take()
125 result := job * 10
126 announce("worker", "Jobbing $job into $result")
127 results.give(result)
128 announce("worker", "Signaled $result")
131 for i in 10
132 announce("boss", "Pushing job $i")
133 jobs.give(i)
134 announce("boss", "Gave job $i")
136 for i in 10
137 announce("boss", "Getting result...")
138 result := results.take()
139 announce("boss", "Got result $result")
141 >> worker.cancel()