aboutsummaryrefslogtreecommitdiff
path: root/stdlib/channels.c
diff options
context:
space:
mode:
authorBruce Hill <bruce@bruce-hill.com>2024-09-13 20:18:08 -0400
committerBruce Hill <bruce@bruce-hill.com>2024-09-13 20:18:08 -0400
commitc455e7b67d2e55e6ed03e3449203d4e307f5a7dd (patch)
tree27d9d4c77193f7aa1fe3a3c6fe5631d0ccfd59e2 /stdlib/channels.c
parent816aa29b799132acb8c71d4968df6c4619fb2b1d (diff)
Rename builtins/ -> stdlib/
Diffstat (limited to 'stdlib/channels.c')
-rw-r--r--stdlib/channels.c137
1 files changed, 137 insertions, 0 deletions
diff --git a/stdlib/channels.c b/stdlib/channels.c
new file mode 100644
index 00000000..3681b0b8
--- /dev/null
+++ b/stdlib/channels.c
@@ -0,0 +1,137 @@
+// Functions that operate on channels
+
+#include <ctype.h>
+#include <err.h>
+#include <gc.h>
+#include <math.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <sys/param.h>
+
+#include "arrays.h"
+#include "metamethods.h"
+#include "integers.h"
+#include "siphash.h"
+#include "text.h"
+#include "types.h"
+#include "util.h"
+
+public Channel_t *Channel$new(Int_t max_size)
+{
+ if (Int$compare_value(max_size, I_small(0)) <= 0)
+ fail("Cannot create a channel with a size less than one: %ld", max_size);
+ Channel_t *channel = new(Channel_t);
+ channel->items = (Array_t){};
+ channel->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
+ channel->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
+ channel->max_size = Int_to_Int64(max_size, false);
+ return channel;
+}
+
+public void Channel$give(Channel_t *channel, const void *item, bool front, int64_t padded_item_size)
+{
+ (void)pthread_mutex_lock(&channel->mutex);
+ while (channel->items.length >= channel->max_size)
+ pthread_cond_wait(&channel->cond, &channel->mutex);
+ Int_t index = front ? I_small(1) : I_small(0);
+ Array$insert(&channel->items, item, index, padded_item_size);
+ (void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
+}
+
+public void Channel$give_all(Channel_t *channel, Array_t to_give, bool front, int64_t padded_item_size)
+{
+ if (to_give.length == 0) return;
+ (void)pthread_mutex_lock(&channel->mutex);
+ Int_t index = front ? I_small(1) : I_small(0);
+ if (channel->items.length + to_give.length >= channel->max_size) {
+ for (int64_t i = 0; i < to_give.length; i++) {
+ while (channel->items.length >= channel->max_size)
+ pthread_cond_wait(&channel->cond, &channel->mutex);
+ Array$insert(&channel->items, to_give.data + i*to_give.stride, index, padded_item_size);
+ }
+ } else {
+ Array$insert_all(&channel->items, to_give, index, padded_item_size);
+ }
+ (void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
+}
+
+public void Channel$get(Channel_t *channel, void *out, bool front, int64_t item_size, int64_t padded_item_size)
+{
+ (void)pthread_mutex_lock(&channel->mutex);
+ while (channel->items.length == 0)
+ pthread_cond_wait(&channel->cond, &channel->mutex);
+ memcpy(out, channel->items.data + channel->items.stride * (front ? 0 : channel->items.length-1), (size_t)(item_size));
+ Int_t index = front ? I_small(1) : Int64_to_Int(channel->items.length);
+ Array$remove_at(&channel->items, index, I_small(1), padded_item_size);
+ (void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
+}
+
+public void Channel$peek(Channel_t *channel, void *out, bool front, int64_t item_size)
+{
+ (void)pthread_mutex_lock(&channel->mutex);
+ while (channel->items.length == 0)
+ pthread_cond_wait(&channel->cond, &channel->mutex);
+ int64_t index = front ? 0 : channel->items.length-1;
+ memcpy(out, channel->items.data + channel->items.stride*index, (size_t)(item_size));
+ (void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
+}
+
+public Array_t Channel$view(Channel_t *channel)
+{
+ (void)pthread_mutex_lock(&channel->mutex);
+ ARRAY_INCREF(channel->items);
+ Array_t ret = channel->items;
+ (void)pthread_mutex_unlock(&channel->mutex);
+ return ret;
+}
+
+public void Channel$clear(Channel_t *channel)
+{
+ (void)pthread_mutex_lock(&channel->mutex);
+ Array$clear(&channel->items);
+ (void)pthread_mutex_unlock(&channel->mutex);
+ (void)pthread_cond_signal(&channel->cond);
+}
+
+PUREFUNC public uint64_t Channel$hash(Channel_t **channel, const TypeInfo *type)
+{
+ (void)type;
+ return siphash24((void*)*channel, sizeof(Channel_t*));
+}
+
+PUREFUNC public int32_t Channel$compare(Channel_t **x, Channel_t **y, const TypeInfo *type)
+{
+ (void)type;
+ return (*x > *y) - (*x < *y);
+}
+
+PUREFUNC public bool Channel$equal(Channel_t **x, Channel_t **y, const TypeInfo *type)
+{
+ (void)type;
+ return (*x == *y);
+}
+
+public Text_t Channel$as_text(Channel_t **channel, bool colorize, const TypeInfo *type)
+{
+ const TypeInfo *item_type = type->ChannelInfo.item;
+ if (!channel) {
+ Text_t typename = generic_as_text(NULL, false, item_type);
+ return Text$concat(colorize ? Text("\x1b[34;1m|:") : Text("|:"), typename, colorize ? Text("|\x1b[m") : Text("|"));
+ }
+ Text_t typename = generic_as_text(NULL, false, item_type);
+ return Text$concat(
+ colorize ? Text("\x1b[34;1m|:") : Text("|:"),
+ typename,
+ Text("|<"),
+ Int64$hex((int64_t)(void*)*channel, I_small(0), true, true),
+ colorize ? Text(">\x1b[m") : Text(">")
+ );
+}
+
+// vim: ts=4 sw=0 et cino=L2,l1,(0,W4,m1,\:0