aboutsummaryrefslogtreecommitdiff
path: root/examples/pthreads/pthreads.tm
blob: 07a80e95424c5b798cd718bffc2cc5be918e176d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# A Posix Threads (pthreads) wrapper
use <pthread.h>

extern pthread_mutex_lock:func(mutex:&pthread_mutex_t -> Int32)
extern pthread_mutex_unlock:func(mutex:&pthread_mutex_t -> Int32)

struct pthread_mutex_t(; extern, opaque):
    func new(->@pthread_mutex_t):
        return inline C : @pthread_mutex_t {
            pthread_mutex_t *mutex = GC_MALLOC(sizeof(pthread_mutex_t));
            pthread_mutex_init(mutex, NULL);
            GC_register_finalizer(mutex, (void*)pthread_mutex_destroy, NULL, NULL, NULL);
            mutex
        }

    func lock(m:&pthread_mutex_t):
        fail("Failed to lock mutex") unless pthread_mutex_lock(m) == 0

    func unlock(m:&pthread_mutex_t):
        fail("Failed to unlock mutex") unless pthread_mutex_unlock(m) == 0

extern pthread_cond_wait:func(cond:&pthread_cond_t, mutex:&pthread_mutex_t -> Int32)
extern pthread_cond_signal:func(cond:&pthread_cond_t -> Int32)
extern pthread_cond_broadcast:func(cond:&pthread_cond_t -> Int32)

struct pthread_cond_t(; extern, opaque):
    func new(->@pthread_cond_t):
        return inline C : @pthread_cond_t {
            pthread_cond_t *cond = GC_MALLOC(sizeof(pthread_cond_t));
            pthread_cond_init(cond, NULL);
            GC_register_finalizer(cond, (void*)pthread_cond_destroy, NULL, NULL, NULL);
            cond
        }

    func wait(cond:&pthread_cond_t, mutex:&pthread_mutex_t):
        fail("Failed to wait on condition") unless pthread_cond_wait(cond, mutex) == 0

    func signal(cond:&pthread_cond_t):
        fail("Failed to signal pthread_cond_t") unless pthread_cond_signal(cond) == 0

    func broadcast(cond:&pthread_cond_t):
        fail("Failed to broadcast pthread_cond_t") unless pthread_cond_broadcast(cond) == 0

struct pthread_t(; extern, opaque):
    func new(fn:func() -> @pthread_t):
        return inline C : @pthread_t {
            pthread_t *thread = GC_MALLOC(sizeof(pthread_t));
            pthread_create(thread, NULL, _$fn.fn, _$fn.userdata);
            thread
        }

    func join(p:@pthread_t): inline C { pthread_join(*_$p, NULL); }
    func cancel(p:@pthread_t): inline C { pthread_cancel(*_$p); }
    func detatch(p:@pthread_t): inline C { pthread_detach(*_$p); }

struct IntQueue(_queue:@[Int], _mutex:@pthread_mutex_t, _cond:@pthread_cond_t):
    func new(initial=[:Int] -> IntQueue):
        return IntQueue(@initial, pthread_mutex_t.new(), pthread_cond_t.new())

    func give(q:IntQueue, n:Int):
        do_begin: q._mutex:lock()
        do_end: q._mutex:unlock()
        do: q._queue:insert(n)
        q._cond:signal()

    func take(q:IntQueue -> Int):
        do_begin: q._mutex:lock()
        do_end: q._mutex:unlock()
        do:
            repeat:
                if n := q._queue:pop(1):
                    return n
                q._cond:wait(q._mutex)
        fail("Unreachable")

func main():
    jobs := IntQueue.new()
    results := IntQueue.new()

    say_mutex := pthread_mutex_t.new()
    announce := func(speaker:Text, text:Text):
        do_begin: say_mutex:lock()
        do_end: say_mutex:unlock()
        do: say("$\033[2m[$speaker]$\033[m $text")

    worker := pthread_t.new(func():
        say("I'm in the thread!")
        repeat:
            announce("worker", "waiting for job")
            job := jobs:take()
            result := job * 10
            announce("worker", "Jobbing $job into $result")
            results:give(result)
            announce("worker", "Signaled $result")
    )

    for i in 10:
        announce("boss", "Pushing job $i")
        jobs:give(i)
        announce("boss", "Gave job $i")

    for i in 10:
        announce("boss", "Getting result...")
        result := results:take()
        announce("boss", "Got result $result")

    >> worker:cancel()