aboutsummaryrefslogtreecommitdiff
path: root/builtins/channel.c
diff options
context:
space:
mode:
authorBruce Hill <bruce@bruce-hill.com>2024-08-11 15:04:22 -0400
committerBruce Hill <bruce@bruce-hill.com>2024-08-11 15:04:22 -0400
commitd2f4d07585d1e915365f3aaea6fc696e00a9e26d (patch)
tree12cf2a0f978835bc55db572df8e0d114c9b89494 /builtins/channel.c
parent2ecb5fe885042ca6c25ee0a3e3da070ddec9e07e (diff)
Support channels with maximum size
Diffstat (limited to 'builtins/channel.c')
-rw-r--r--builtins/channel.c20
1 files changed, 18 insertions, 2 deletions
diff --git a/builtins/channel.c b/builtins/channel.c
index 0b5f7411..cfb398b0 100644
--- a/builtins/channel.c
+++ b/builtins/channel.c
@@ -17,18 +17,23 @@
#include "types.h"
#include "util.h"
-public channel_t *Channel$new(void)
+public channel_t *Channel$new(int64_t max_size)
{
+ if (max_size <= 0)
+ fail("Cannot create a channel with a size less than one: %ld", max_size);
channel_t *channel = new(channel_t);
channel->items = (array_t){};
channel->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
channel->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+ channel->max_size = max_size;
return channel;
}
public void Channel$push(channel_t *channel, const void *item, int64_t padded_item_size)
{
(void)pthread_mutex_lock(&channel->mutex);
+ while (channel->items.length >= channel->max_size)
+ pthread_cond_wait(&channel->cond, &channel->mutex);
Array$insert(&channel->items, item, 0, padded_item_size);
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
@@ -36,8 +41,17 @@ public void Channel$push(channel_t *channel, const void *item, int64_t padded_it
public void Channel$push_all(channel_t *channel, array_t to_push, int64_t padded_item_size)
{
+ if (to_push.length == 0) return;
(void)pthread_mutex_lock(&channel->mutex);
- Array$insert_all(&channel->items, to_push, 0, padded_item_size);
+ if (channel->items.length + to_push.length >= channel->max_size) {
+ for (int64_t i = 0; i < to_push.length; i++) {
+ while (channel->items.length >= channel->max_size)
+ pthread_cond_wait(&channel->cond, &channel->mutex);
+ Array$insert(&channel->items, to_push.data + i*to_push.stride, 0, padded_item_size);
+ }
+ } else {
+ Array$insert_all(&channel->items, to_push, 0, padded_item_size);
+ }
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
}
@@ -50,6 +64,7 @@ public void Channel$pop(channel_t *channel, void *out, int64_t item_size, int64_
memcpy(out, channel->items.data, item_size);
Array$remove(&channel->items, 1, 1, padded_item_size);
(void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
}
public array_t Channel$view(channel_t *channel)
@@ -66,6 +81,7 @@ public void Channel$clear(channel_t *channel)
(void)pthread_mutex_lock(&channel->mutex);
Array$clear(&channel->items);
(void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
}
public uint32_t Channel$hash(const channel_t **channel, const TypeInfo *type)