diff options
Diffstat (limited to 'builtins')
| -rw-r--r-- | builtins/channel.c | 20 | ||||
| -rw-r--r-- | builtins/channel.h | 2 | ||||
| -rw-r--r-- | builtins/datatypes.h | 1 |
3 files changed, 20 insertions, 3 deletions
diff --git a/builtins/channel.c b/builtins/channel.c index 0b5f7411..cfb398b0 100644 --- a/builtins/channel.c +++ b/builtins/channel.c @@ -17,18 +17,23 @@ #include "types.h" #include "util.h" -public channel_t *Channel$new(void) +public channel_t *Channel$new(int64_t max_size) { + if (max_size <= 0) + fail("Cannot create a channel with a size less than one: %ld", max_size); channel_t *channel = new(channel_t); channel->items = (array_t){}; channel->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; channel->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; + channel->max_size = max_size; return channel; } public void Channel$push(channel_t *channel, const void *item, int64_t padded_item_size) { (void)pthread_mutex_lock(&channel->mutex); + while (channel->items.length >= channel->max_size) + pthread_cond_wait(&channel->cond, &channel->mutex); Array$insert(&channel->items, item, 0, padded_item_size); (void)pthread_mutex_unlock(&channel->mutex); (void)pthread_cond_signal(&channel->cond); @@ -36,8 +41,17 @@ public void Channel$push(channel_t *channel, const void *item, int64_t padded_it public void Channel$push_all(channel_t *channel, array_t to_push, int64_t padded_item_size) { + if (to_push.length == 0) return; (void)pthread_mutex_lock(&channel->mutex); - Array$insert_all(&channel->items, to_push, 0, padded_item_size); + if (channel->items.length + to_push.length >= channel->max_size) { + for (int64_t i = 0; i < to_push.length; i++) { + while (channel->items.length >= channel->max_size) + pthread_cond_wait(&channel->cond, &channel->mutex); + Array$insert(&channel->items, to_push.data + i*to_push.stride, 0, padded_item_size); + } + } else { + Array$insert_all(&channel->items, to_push, 0, padded_item_size); + } (void)pthread_mutex_unlock(&channel->mutex); (void)pthread_cond_signal(&channel->cond); } @@ -50,6 +64,7 @@ public void Channel$pop(channel_t *channel, void *out, int64_t item_size, int64_ memcpy(out, channel->items.data, item_size); Array$remove(&channel->items, 1, 1, padded_item_size); (void)pthread_mutex_unlock(&channel->mutex); + (void)pthread_cond_signal(&channel->cond); } public array_t Channel$view(channel_t *channel) @@ -66,6 +81,7 @@ public void Channel$clear(channel_t *channel) (void)pthread_mutex_lock(&channel->mutex); Array$clear(&channel->items); (void)pthread_mutex_unlock(&channel->mutex); + (void)pthread_cond_signal(&channel->cond); } public uint32_t Channel$hash(const channel_t **channel, const TypeInfo *type) diff --git a/builtins/channel.h b/builtins/channel.h index 2ce71833..6f628726 100644 --- a/builtins/channel.h +++ b/builtins/channel.h @@ -9,7 +9,7 @@ #include "types.h" #include "util.h" -channel_t *Channel$new(void); +channel_t *Channel$new(int64_t max_size); void Channel$push(channel_t *channel, const void *item, int64_t padded_item_size); #define Channel$push_value(channel, item, padded_item_size) ({ __typeof(item) _item = item; Channel$push(channel, &_item, padded_item_size); }) void Channel$push_all(channel_t *channel, array_t to_push, int64_t padded_item_size); diff --git a/builtins/datatypes.h b/builtins/datatypes.h index 41a67a5e..a9f28dc1 100644 --- a/builtins/datatypes.h +++ b/builtins/datatypes.h @@ -62,6 +62,7 @@ typedef struct { array_t items; pthread_mutex_t mutex; pthread_cond_t cond; + int64_t max_size; } channel_t; // vim: ts=4 sw=0 et cino=L2,l1,(0,W4,m1,\:0 |
