aboutsummaryrefslogtreecommitdiff
path: root/builtins
diff options
context:
space:
mode:
Diffstat (limited to 'builtins')
-rw-r--r--builtins/channel.c20
-rw-r--r--builtins/channel.h2
-rw-r--r--builtins/datatypes.h1
3 files changed, 20 insertions, 3 deletions
diff --git a/builtins/channel.c b/builtins/channel.c
index 0b5f7411..cfb398b0 100644
--- a/builtins/channel.c
+++ b/builtins/channel.c
@@ -17,18 +17,23 @@
#include "types.h"
#include "util.h"
-public channel_t *Channel$new(void)
+public channel_t *Channel$new(int64_t max_size)
{
+ if (max_size <= 0)
+ fail("Cannot create a channel with a size less than one: %ld", max_size);
channel_t *channel = new(channel_t);
channel->items = (array_t){};
channel->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
channel->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+ channel->max_size = max_size;
return channel;
}
public void Channel$push(channel_t *channel, const void *item, int64_t padded_item_size)
{
(void)pthread_mutex_lock(&channel->mutex);
+ while (channel->items.length >= channel->max_size)
+ pthread_cond_wait(&channel->cond, &channel->mutex);
Array$insert(&channel->items, item, 0, padded_item_size);
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
@@ -36,8 +41,17 @@ public void Channel$push(channel_t *channel, const void *item, int64_t padded_it
public void Channel$push_all(channel_t *channel, array_t to_push, int64_t padded_item_size)
{
+ if (to_push.length == 0) return;
(void)pthread_mutex_lock(&channel->mutex);
- Array$insert_all(&channel->items, to_push, 0, padded_item_size);
+ if (channel->items.length + to_push.length >= channel->max_size) {
+ for (int64_t i = 0; i < to_push.length; i++) {
+ while (channel->items.length >= channel->max_size)
+ pthread_cond_wait(&channel->cond, &channel->mutex);
+ Array$insert(&channel->items, to_push.data + i*to_push.stride, 0, padded_item_size);
+ }
+ } else {
+ Array$insert_all(&channel->items, to_push, 0, padded_item_size);
+ }
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
}
@@ -50,6 +64,7 @@ public void Channel$pop(channel_t *channel, void *out, int64_t item_size, int64_
memcpy(out, channel->items.data, item_size);
Array$remove(&channel->items, 1, 1, padded_item_size);
(void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
}
public array_t Channel$view(channel_t *channel)
@@ -66,6 +81,7 @@ public void Channel$clear(channel_t *channel)
(void)pthread_mutex_lock(&channel->mutex);
Array$clear(&channel->items);
(void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
}
public uint32_t Channel$hash(const channel_t **channel, const TypeInfo *type)
diff --git a/builtins/channel.h b/builtins/channel.h
index 2ce71833..6f628726 100644
--- a/builtins/channel.h
+++ b/builtins/channel.h
@@ -9,7 +9,7 @@
#include "types.h"
#include "util.h"
-channel_t *Channel$new(void);
+channel_t *Channel$new(int64_t max_size);
void Channel$push(channel_t *channel, const void *item, int64_t padded_item_size);
#define Channel$push_value(channel, item, padded_item_size) ({ __typeof(item) _item = item; Channel$push(channel, &_item, padded_item_size); })
void Channel$push_all(channel_t *channel, array_t to_push, int64_t padded_item_size);
diff --git a/builtins/datatypes.h b/builtins/datatypes.h
index 41a67a5e..a9f28dc1 100644
--- a/builtins/datatypes.h
+++ b/builtins/datatypes.h
@@ -62,6 +62,7 @@ typedef struct {
array_t items;
pthread_mutex_t mutex;
pthread_cond_t cond;
+ int64_t max_size;
} channel_t;
// vim: ts=4 sw=0 et cino=L2,l1,(0,W4,m1,\:0