aboutsummaryrefslogtreecommitdiff
path: root/builtins/channel.c
blob: cfb398b02e9d53ce860a2f5ffcb432582782b076 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Functions that operate on channels

#include <ctype.h>
#include <err.h>
#include <gc.h>
#include <gc/cord.h>
#include <math.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/param.h>

#include "array.h"
#include "functions.h"
#include "halfsiphash.h"
#include "types.h"
#include "util.h"

public channel_t *Channel$new(int64_t max_size)
{
    if (max_size <= 0)
        fail("Cannot create a channel with a size less than one: %ld", max_size);
    channel_t *channel = new(channel_t);
    channel->items = (array_t){};
    channel->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
    channel->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
    channel->max_size = max_size;
    return channel;
}

public void Channel$push(channel_t *channel, const void *item, int64_t padded_item_size)
{
    (void)pthread_mutex_lock(&channel->mutex);
    while (channel->items.length >= channel->max_size)
        pthread_cond_wait(&channel->cond, &channel->mutex);
    Array$insert(&channel->items, item, 0, padded_item_size);
    (void)pthread_mutex_unlock(&channel->mutex);
    (void)pthread_cond_signal(&channel->cond);
}

public void Channel$push_all(channel_t *channel, array_t to_push, int64_t padded_item_size)
{
    if (to_push.length == 0) return;
    (void)pthread_mutex_lock(&channel->mutex);
    if (channel->items.length + to_push.length >= channel->max_size) {
        for (int64_t i = 0; i < to_push.length; i++) {
            while (channel->items.length >= channel->max_size)
                pthread_cond_wait(&channel->cond, &channel->mutex);
            Array$insert(&channel->items, to_push.data + i*to_push.stride, 0, padded_item_size);
        }
    } else {
        Array$insert_all(&channel->items, to_push, 0, padded_item_size);
    }
    (void)pthread_mutex_unlock(&channel->mutex);
    (void)pthread_cond_signal(&channel->cond);
}

public void Channel$pop(channel_t *channel, void *out, int64_t item_size, int64_t padded_item_size)
{
    (void)pthread_mutex_lock(&channel->mutex);
    while (channel->items.length == 0)
        pthread_cond_wait(&channel->cond, &channel->mutex);
    memcpy(out, channel->items.data, item_size);
    Array$remove(&channel->items, 1, 1, padded_item_size);
    (void)pthread_mutex_unlock(&channel->mutex);
    (void)pthread_cond_signal(&channel->cond);
}

public array_t Channel$view(channel_t *channel)
{
    (void)pthread_mutex_lock(&channel->mutex);
    ARRAY_INCREF(channel->items);
    array_t ret = channel->items;
    (void)pthread_mutex_unlock(&channel->mutex);
    return ret;
}

public void Channel$clear(channel_t *channel)
{
    (void)pthread_mutex_lock(&channel->mutex);
    Array$clear(&channel->items);
    (void)pthread_mutex_unlock(&channel->mutex);
    (void)pthread_cond_signal(&channel->cond);
}

public uint32_t Channel$hash(const channel_t **channel, const TypeInfo *type)
{
    (void)type;
    uint32_t hash;
    halfsiphash(*channel, sizeof(channel_t*), TOMO_HASH_KEY, (uint8_t*)&hash, sizeof(hash));
    return hash;
}

public int32_t Channel$compare(const channel_t **x, const channel_t **y, const TypeInfo *type)
{
    (void)type;
    return (*x > *y) - (*x < *y);
}

bool Channel$equal(const channel_t **x, const channel_t **y, const TypeInfo *type)
{
    (void)type;
    return (*x == *y);
}

CORD Channel$as_text(const channel_t **channel, bool colorize, const TypeInfo *type)
{
    const TypeInfo *item_type = type->ChannelInfo.item;
    if (!channel) {
        CORD typename = generic_as_text(NULL, false, item_type);
        return colorize ? CORD_asprintf("\x1b[34;1m|:%s|\x1b[m", typename) : CORD_all("|:", typename, "|");
    }
    CORD typename = generic_as_text(NULL, false, item_type);
    return CORD_asprintf(colorize ? "\x1b[34;1m|:%s|<%p>\x1b[m" : "|:%s|<%p>", typename, *channel);
}

// vim: ts=4 sw=0 et cino=L2,l1,(0,W4,m1,\:0