diff options
| author | Bruce Hill <bruce@bruce-hill.com> | 2024-08-11 15:04:22 -0400 |
|---|---|---|
| committer | Bruce Hill <bruce@bruce-hill.com> | 2024-08-11 15:04:22 -0400 |
| commit | d2f4d07585d1e915365f3aaea6fc696e00a9e26d (patch) | |
| tree | 12cf2a0f978835bc55db572df8e0d114c9b89494 /builtins/channel.c | |
| parent | 2ecb5fe885042ca6c25ee0a3e3da070ddec9e07e (diff) | |
Support channels with maximum size
Diffstat (limited to 'builtins/channel.c')
| -rw-r--r-- | builtins/channel.c | 20 |
1 files changed, 18 insertions, 2 deletions
diff --git a/builtins/channel.c b/builtins/channel.c index 0b5f7411..cfb398b0 100644 --- a/builtins/channel.c +++ b/builtins/channel.c @@ -17,18 +17,23 @@ #include "types.h" #include "util.h" -public channel_t *Channel$new(void) +public channel_t *Channel$new(int64_t max_size) { + if (max_size <= 0) + fail("Cannot create a channel with a size less than one: %ld", max_size); channel_t *channel = new(channel_t); channel->items = (array_t){}; channel->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; channel->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER; + channel->max_size = max_size; return channel; } public void Channel$push(channel_t *channel, const void *item, int64_t padded_item_size) { (void)pthread_mutex_lock(&channel->mutex); + while (channel->items.length >= channel->max_size) + pthread_cond_wait(&channel->cond, &channel->mutex); Array$insert(&channel->items, item, 0, padded_item_size); (void)pthread_mutex_unlock(&channel->mutex); (void)pthread_cond_signal(&channel->cond); @@ -36,8 +41,17 @@ public void Channel$push(channel_t *channel, const void *item, int64_t padded_it public void Channel$push_all(channel_t *channel, array_t to_push, int64_t padded_item_size) { + if (to_push.length == 0) return; (void)pthread_mutex_lock(&channel->mutex); - Array$insert_all(&channel->items, to_push, 0, padded_item_size); + if (channel->items.length + to_push.length >= channel->max_size) { + for (int64_t i = 0; i < to_push.length; i++) { + while (channel->items.length >= channel->max_size) + pthread_cond_wait(&channel->cond, &channel->mutex); + Array$insert(&channel->items, to_push.data + i*to_push.stride, 0, padded_item_size); + } + } else { + Array$insert_all(&channel->items, to_push, 0, padded_item_size); + } (void)pthread_mutex_unlock(&channel->mutex); (void)pthread_cond_signal(&channel->cond); } @@ -50,6 +64,7 @@ public void Channel$pop(channel_t *channel, void *out, int64_t item_size, int64_ memcpy(out, channel->items.data, item_size); Array$remove(&channel->items, 1, 1, padded_item_size); (void)pthread_mutex_unlock(&channel->mutex); + (void)pthread_cond_signal(&channel->cond); } public array_t Channel$view(channel_t *channel) @@ -66,6 +81,7 @@ public void Channel$clear(channel_t *channel) (void)pthread_mutex_lock(&channel->mutex); Array$clear(&channel->items); (void)pthread_mutex_unlock(&channel->mutex); + (void)pthread_cond_signal(&channel->cond); } public uint32_t Channel$hash(const channel_t **channel, const TypeInfo *type) |
