Replace threads with generic mutexed datastructures.

This commit is contained in:
Bruce Hill 2025-01-02 16:24:07 -05:00
parent 2fcf1939bb
commit be384c0caa
26 changed files with 126 additions and 639 deletions

View File

@ -30,7 +30,7 @@ O=-Og
CFLAGS=$(CCONFIG) $(EXTRA) $(CWARN) $(G) $(O) $(OSFLAGS) $(LTO)
CFLAGS_PLACEHOLDER="$$(echo -e '\033[2m<flags...>\033[m')"
LDLIBS=-lgc -lcord -lm -lunistring -lgmp -ldl
BUILTIN_OBJS=stdlib/siphash.o stdlib/arrays.o stdlib/bools.o stdlib/bytes.o stdlib/channels.o stdlib/nums.o stdlib/integers.o \
BUILTIN_OBJS=stdlib/siphash.o stdlib/arrays.o stdlib/bools.o stdlib/bytes.o stdlib/nums.o stdlib/integers.o \
stdlib/pointers.o stdlib/memory.o stdlib/text.o stdlib/threads.o stdlib/c_strings.o stdlib/tables.o \
stdlib/types.o stdlib/util.o stdlib/files.o stdlib/ranges.o stdlib/shell.o stdlib/paths.o stdlib/rng.o \
stdlib/optionals.o stdlib/patterns.o stdlib/metamethods.o stdlib/functiontype.o stdlib/stdlib.o \

View File

@ -77,8 +77,7 @@ of many language features or the other example programs/modules in
- Built-in doctests with syntax highlighting
- [Automatic command line argument parsing with type safety](docs/command-line-parsing.md)
- [Easy interoperability with C](docs/c-interoperability.md)
- Support for [POSIX threads](docs/threads.tm) and thread-safe message passing
queues over [channels](docs/channels.tm).
- Support for [POSIX threads](docs/threads.tm) and mutex-guarded datastructures.
- Built-in [data serialization and deserialization](docs/serialization.md).
## Dependencies

3
ast.c
View File

@ -116,6 +116,7 @@ CORD ast_to_xml(ast_t *ast)
T(Not, "<Not>%r</Not>", ast_to_xml(data.value))
T(HeapAllocate, "<HeapAllocate>%r</HeapAllocate>", ast_to_xml(data.value))
T(StackReference, "<StackReference>%r</StackReference>", ast_to_xml(data.value))
T(Mutexed, "<Mutexed>%r</Mutexed>", ast_to_xml(data.value))
T(Min, "<Min>%r%r%r</Min>", ast_to_xml(data.lhs), ast_to_xml(data.rhs), optional_tagged("key", data.key))
T(Max, "<Max>%r%r%r</Max>", ast_to_xml(data.lhs), ast_to_xml(data.rhs), optional_tagged("key", data.key))
T(Array, "<Array>%r%r</Array>", optional_tagged_type("item-type", data.item_type), ast_list_to_xml(data.items))
@ -127,7 +128,6 @@ CORD ast_to_xml(ast_t *ast)
optional_tagged("default-value", data.default_value),
ast_list_to_xml(data.entries), optional_tagged("fallback", data.fallback))
T(TableEntry, "<TableEntry>%r%r</TableEntry>", ast_to_xml(data.key), ast_to_xml(data.value))
T(Channel, "<Channel>%r%r</Channel>", type_ast_to_xml(data.item_type), optional_tagged("max-size", data.max_size))
T(Comprehension, "<Comprehension>%r%r%r%r%r</Comprehension>", optional_tagged("expr", data.expr),
ast_list_to_xml(data.vars), optional_tagged("iter", data.iter),
optional_tagged("filter", data.filter))
@ -182,7 +182,6 @@ CORD type_ast_to_xml(type_ast_t *t)
data.is_stack ? "yes" : "no", type_ast_to_xml(data.pointed))
T(ArrayTypeAST, "<ArrayType>%r</ArrayType>", type_ast_to_xml(data.item))
T(SetTypeAST, "<TableType>%r</TableType>", type_ast_to_xml(data.item))
T(ChannelTypeAST, "<ChannelType>%r</ChannelType>", type_ast_to_xml(data.item))
T(TableTypeAST, "<TableType>%r %r</TableType>", type_ast_to_xml(data.key), type_ast_to_xml(data.value))
T(FunctionTypeAST, "<FunctionType>%r %r</FunctionType>", arg_list_to_xml(data.args), type_ast_to_xml(data.ret))
T(OptionalTypeAST, "<OptionalType>%r</OptionalType>", data.type)

13
ast.h
View File

@ -75,7 +75,6 @@ typedef enum {
PointerTypeAST,
ArrayTypeAST,
SetTypeAST,
ChannelTypeAST,
TableTypeAST,
FunctionTypeAST,
OptionalTypeAST,
@ -103,7 +102,7 @@ struct type_ast_s {
} PointerTypeAST;
struct {
type_ast_t *item;
} ArrayTypeAST, ChannelTypeAST;
} ArrayTypeAST;
struct {
type_ast_t *key, *value;
ast_t *default_value;
@ -128,9 +127,9 @@ typedef enum {
TextLiteral, TextJoin, PrintStatement,
Declare, Assign,
BinaryOp, UpdateAssign,
Not, Negative, HeapAllocate, StackReference,
Not, Negative, HeapAllocate, StackReference, Mutexed,
Min, Max,
Array, Channel, Set, Table, TableEntry, Comprehension,
Array, Set, Table, TableEntry, Comprehension,
FunctionDef, Lambda,
FunctionCall, MethodCall,
Block,
@ -194,7 +193,7 @@ struct ast_s {
} BinaryOp, UpdateAssign;
struct {
ast_t *value;
} Not, Negative, HeapAllocate, StackReference;
} Not, Negative, HeapAllocate, StackReference, Mutexed;
struct {
ast_t *lhs, *rhs, *key;
} Min, Max;
@ -202,10 +201,6 @@ struct ast_s {
type_ast_t *item_type;
ast_list_t *items;
} Array;
struct {
type_ast_t *item_type;
ast_t *max_size;
} Channel;
struct {
type_ast_t *item_type;
ast_list_t *items;

View File

@ -64,6 +64,11 @@ static bool promote(env_t *env, ast_t *ast, CORD *code, type_t *actual, type_t *
if (!can_promote(actual, needed))
return false;
if (needed->tag == ClosureType && actual->tag == FunctionType) {
*code = CORD_all("((Closure_t){", *code, ", NULL})");
return true;
}
// Optional promotion:
if (needed->tag == OptionalType && type_eq(actual, Match(needed, OptionalType)->type)) {
*code = promote_to_optional(actual, *code);
@ -136,11 +141,6 @@ static bool promote(env_t *env, ast_t *ast, CORD *code, type_t *actual, type_t *
if (needed->tag == TableType && actual->tag == TableType)
return true;
if (needed->tag == ClosureType && actual->tag == FunctionType) {
*code = CORD_all("((Closure_t){", *code, ", NULL})");
return true;
}
if (needed->tag == ClosureType && actual->tag == ClosureType)
return true;
@ -271,7 +271,6 @@ CORD compile_type(type_t *t)
}
case ArrayType: return "Array_t";
case SetType: return "Table_t";
case ChannelType: return "Channel_t*";
case TableType: return "Table_t";
case FunctionType: {
auto fn = Match(t, FunctionType);
@ -298,7 +297,7 @@ CORD compile_type(type_t *t)
type_t *nonnull = Match(t, OptionalType)->type;
switch (nonnull->tag) {
case CStringType: case FunctionType: case ClosureType:
case PointerType: case EnumType: case ChannelType:
case PointerType: case EnumType:
return compile_type(nonnull);
case TextType:
return Match(nonnull, TextType)->lang ? compile_type(nonnull) : "OptionalText_t";
@ -436,7 +435,7 @@ CORD check_none(type_t *t, CORD value)
{
t = Match(t, OptionalType)->type;
if (t->tag == PointerType || t->tag == FunctionType || t->tag == CStringType
|| t->tag == ChannelType || t == THREAD_TYPE)
|| t == THREAD_TYPE)
return CORD_all("(", value, " == NULL)");
else if (t == MATCH_TYPE)
return CORD_all("((", value, ").index.small == 0)");
@ -1582,7 +1581,6 @@ CORD expr_as_text(env_t *env, CORD expr, type_t *t, CORD color)
case TextType: return CORD_asprintf("Text$as_text(stack(%r), %r, %r)", expr, color, compile_type_info(env, t));
case ArrayType: return CORD_asprintf("Array$as_text(stack(%r), %r, %r)", expr, color, compile_type_info(env, t));
case SetType: return CORD_asprintf("Table$as_text(stack(%r), %r, %r)", expr, color, compile_type_info(env, t));
case ChannelType: return CORD_asprintf("Channel$as_text(stack(%r), %r, %r)", expr, color, compile_type_info(env, t));
case TableType: return CORD_asprintf("Table$as_text(stack(%r), %r, %r)", expr, color, compile_type_info(env, t));
case FunctionType: case ClosureType: return CORD_asprintf("Func$as_text(stack(%r), %r, %r)", expr, color, compile_type_info(env, t));
case PointerType: return CORD_asprintf("Pointer$as_text(stack(%r), %r, %r)", expr, color, compile_type_info(env, t));
@ -1988,7 +1986,6 @@ CORD compile_none(type_t *t)
case ArrayType: return "NONE_ARRAY";
case TableType: return "NONE_TABLE";
case SetType: return "NONE_TABLE";
case ChannelType: return "NULL";
case TextType: return "NONE_TEXT";
case CStringType: return "NULL";
case MomentType: return "NONE_MOMENT";
@ -2127,6 +2124,9 @@ CORD compile(env_t *env, ast_t *ast)
else
return CORD_all("stack(", compile(env, subject), ")");
}
case Mutexed: {
return CORD_all("mutexed(", compile(env, Match(ast, Mutexed)->value), ")");
}
case Optional: {
ast_t *value = Match(ast, Optional)->value;
CORD value_code = compile(env, value);
@ -2578,20 +2578,6 @@ CORD compile(env_t *env, ast_t *ast)
return code;
}
}
case Channel: {
auto chan = Match(ast, Channel);
type_t *item_t = parse_type_ast(env, chan->item_type);
if (!can_send_over_channel(item_t))
code_err(ast, "This item type can't be sent over a channel because it contains reference to memory that may not be thread-safe.");
if (chan->max_size) {
CORD max_size = compile(env, chan->max_size);
if (!promote(env, chan->max_size, &max_size, get_type(env, chan->max_size), INT_TYPE))
code_err(chan->max_size, "This value must be an integer, not %T", get_type(env, chan->max_size));
return CORD_all("Channel$new(", max_size, ")");
} else {
return "Channel$new(I(INT32_MAX))";
}
}
case Table: {
auto table = Match(ast, Table);
if (!table->entries) {
@ -3092,41 +3078,6 @@ CORD compile(env_t *env, ast_t *ast)
", ", compile_type_info(env, self_value_t), ")");
} else code_err(ast, "There is no '%s' method for tables", call->name);
}
case ChannelType: {
type_t *item_t = Match(self_value_t, ChannelType)->item_type;
CORD padded_item_size = CORD_asprintf("%ld", type_size(item_t));
arg_t *front_default_end = new(arg_t, .name="front", .type=Type(BoolType), .default_val=FakeAST(Bool, false));
arg_t *front_default_start = new(arg_t, .name="front", .type=Type(BoolType), .default_val=FakeAST(Bool, true));
if (streq(call->name, "give")) {
self = compile_to_pointer_depth(env, call->self, 0, false);
arg_t *arg_spec = new(arg_t, .name="item", .type=item_t, .next=front_default_end);
return CORD_all("Channel$give_value(", self, ", ", compile_arguments(env, ast, arg_spec, call->args), ", ",
padded_item_size, ")");
} else if (streq(call->name, "give_all")) {
self = compile_to_pointer_depth(env, call->self, 0, false);
arg_t *arg_spec = new(arg_t, .name="to_give", .type=Type(ArrayType, .item_type=item_t), .next=front_default_end);
return CORD_all("Channel$give_all(", self, ", ", compile_arguments(env, ast, arg_spec, call->args), ", ",
padded_item_size, ")");
} else if (streq(call->name, "get")) {
self = compile_to_pointer_depth(env, call->self, 0, false);
arg_t *arg_spec = front_default_start;
return CORD_all("Channel$get_value(", self, ", ", compile_arguments(env, ast, arg_spec, call->args),
", ", compile_type(item_t), ", ", padded_item_size, ")");
} else if (streq(call->name, "peek")) {
self = compile_to_pointer_depth(env, call->self, 0, false);
arg_t *arg_spec = front_default_start;
return CORD_all("Channel$peek_value(", self, ", ", compile_arguments(env, ast, arg_spec, call->args),
", ", compile_type(item_t), ")");
} else if (streq(call->name, "clear")) {
self = compile_to_pointer_depth(env, call->self, 0, false);
(void)compile_arguments(env, ast, NULL, call->args);
return CORD_all("Channel$clear(", self, ")");
} else if (streq(call->name, "view")) {
self = compile_to_pointer_depth(env, call->self, 0, false);
(void)compile_arguments(env, ast, NULL, call->args);
return CORD_all("Channel$view(", self, ")");
} else code_err(ast, "There is no '%s' method for channels", call->name);
}
case TableType: {
auto table = Match(self_value_t, TableType);
if (streq(call->name, "get")) {
@ -3599,11 +3550,6 @@ CORD compile(env_t *env, ast_t *ast)
return CORD_all("Int64_to_Int((", compile_to_pointer_depth(env, f->fielded, 0, false), ").length)");
code_err(ast, "There is no %s field on arrays", f->field);
}
case ChannelType: {
if (streq(f->field, "max_size"))
return CORD_all("Int64_to_Int((", compile_to_pointer_depth(env, f->fielded, 0, false), ")->max_size)");
code_err(ast, "There is no %s field on arrays", f->field);
}
case SetType: {
if (streq(f->field, "items"))
return CORD_all("ARRAY_COPY((", compile_to_pointer_depth(env, f->fielded, 0, false), ").entries)");
@ -3842,10 +3788,6 @@ CORD compile_type_info(env_t *env, type_t *t)
type_t *item_type = Match(t, SetType)->item_type;
return CORD_all("Set$info(", compile_type_info(env, item_type), ")");
}
case ChannelType: {
type_t *item_t = Match(t, ChannelType)->item_type;
return CORD_asprintf("Channel$info(%r)", compile_type_info(env, item_t));
}
case TableType: {
auto table = Match(t, TableType);
type_t *key_type = table->key_type;

View File

@ -21,7 +21,6 @@ Information about Tomo's built-in types can be found here:
- [Arrays](arrays.md)
- [Booleans](booleans.md)
- [Bytes](bytes.md)
- [Channels](channels.md)
- [Moment](moments.md)
- [Enums](enums.md)
- [Floating point numbers](nums.md)

View File

@ -1,177 +0,0 @@
# Channels
Channels are a thread-safe message queue for communicating between threads,
although they can also be used as a general-purpose queue.
## Syntax
The syntax to create a channel is `|:T|`, where `T` is the type that will be
passed through the channel. You can also specify a maximum size for the
channel, which will cause giving to block until the recipient has gotten from
the channel if the maximum size is reached.
```tomo
channel := |:Int|
channel:give(10)
channel:give(20)
>> channel:get()
= 10
>> channel:get()
= 20
small_channel := |:Int; max_size=5|
```
## Channel Methods
### `give`
**Description:**
Adds an item to the channel.
**Signature:**
```tomo
func give(channel:|T|, item: T, front: Bool = no -> Void)
```
**Parameters:**
- `channel`: The channel to which the item will be added.
- `item`: The item to add to the channel.
- `front`: Whether to put the item at the front of the channel (as opposed to the back).
**Returns:**
Nothing.
**Example:**
```tomo
>> channel:give("Hello")
```
---
### `give_all`
**Description:**
Adds multiple items to the channel.
**Signature:**
```tomo
func give_all(channel:|T|, items: [T], front: Bool = no -> Void)
```
**Parameters:**
- `channel`: The channel to which the items will be added.
- `items`: The array of items to add to the channel.
- `front`: Whether to put the item at the front of the channel (as opposed to the back).
**Returns:**
Nothing.
**Example:**
```tomo
>> channel:give_all([1, 2, 3])
```
---
### `get`
**Description:**
Removes and returns an item from the channel. If the channel is empty, it waits until an item is available.
**Signature:**
```tomo
func get(channel:|T|, front: Bool = yes -> T)
```
**Parameters:**
- `channel`: The channel from which to remove an item.
- `front`: Whether to put the item at the front of the channel (as opposed to the back).
**Returns:**
The item removed from the channel.
**Example:**
```tomo
>> channel:peek()
= "Hello"
```
---
### `peek`
**Description:**
Returns the next item that will come out of the channel, but without removing
it. If the channel is empty, it waits until an item is available.
**Signature:**
```tomo
func peek(channel:|T|, front: Bool = yes -> T)
```
**Parameters:**
- `channel`: The channel from which to remove an item.
- `front`: Whether to put the item at the front of the channel (as opposed to the back).
**Returns:**
The item removed from the channel.
**Example:**
```tomo
>> channel:get()
= "Hello"
```
---
### `clear`
**Description:**
Removes all items from the channel.
**Signature:**
```tomo
func clear(channel:|T| -> Void)
```
**Parameters:**
- `channel`: The mutable reference to the channel.
**Returns:**
Nothing.
**Example:**
```tomo
>> channel:clear()
```
---
### `view`
**Description:**
Returns a list of all items currently in the channel without removing them.
**Signature:**
```tomo
func channel:view(->[T])
```
**Parameters:**
- `channel`: The channel to view.
**Returns:**
An array of items currently in the channel.
**Example:**
```tomo
>> channel:view()
= [1, 2, 3]
```

View File

@ -127,9 +127,9 @@ for line in lines:
## Implementation Notes
The implementation of optional types is highly efficient and has no memory
overhead for pointers, collection types (arrays, sets, tables, channels),
booleans, texts, enums, nums, or integers (`Int` type only). This is done by
using carefully chosen values, such as `0` for pointers, `2` for booleans, or a
overhead for pointers, collection types (arrays, sets, tables), booleans,
texts, enums, nums, or integers (`Int` type only). This is done by using
carefully chosen values, such as `0` for pointers, `2` for booleans, or a
negative length for arrays. However, for fixed-size integers (`Int64`, `Int32`,
`Int16`, and `Int8`), bytes, and structs, an additional byte is required for
out-of-band information about whether the value is none or not.

View File

@ -2,7 +2,7 @@
Tomo supports POSIX threads (pthreads) through the `Thread` type. The
recommended practice is to have each thread interact with other threads only
through thread-safe Channels with no other shared data.
through mutex-guarded datastructures.
## Thread Methods

40
parse.c
View File

@ -52,7 +52,7 @@ int op_tightness[] = {
static const char *keywords[] = {
"yes", "xor", "while", "when", "use", "unless", "struct", "stop", "skip", "return",
"or", "not", "no", "mod1", "mod", "pass", "lang", "inline", "in", "if",
"or", "not", "no", "mutexed", "mod1", "mod", "pass", "lang", "inline", "in", "if",
"func", "for", "extern", "enum", "else", "do", "deserialize", "defer", "and", "none",
"_min_", "_max_", NULL,
};
@ -82,7 +82,6 @@ static ast_t *parse_optional_conditional_suffix(parse_ctx_t *ctx, ast_t *stmt);
static ast_t *parse_optional_suffix(parse_ctx_t *ctx, ast_t *lhs);
static arg_ast_t *parse_args(parse_ctx_t *ctx, const char **pos);
static type_ast_t *parse_array_type(parse_ctx_t *ctx, const char *pos);
static type_ast_t *parse_channel_type(parse_ctx_t *ctx, const char *pos);
static type_ast_t *parse_func_type(parse_ctx_t *ctx, const char *pos);
static type_ast_t *parse_non_optional_type(parse_ctx_t *ctx, const char *pos);
static type_ast_t *parse_pointer_type(parse_ctx_t *ctx, const char *pos);
@ -94,7 +93,6 @@ static PARSER(parse_array);
static PARSER(parse_assignment);
static PARSER(parse_block);
static PARSER(parse_bool);
static PARSER(parse_channel);
static PARSER(parse_declaration);
static PARSER(parse_defer);
static PARSER(parse_do);
@ -113,6 +111,7 @@ static PARSER(parse_int);
static PARSER(parse_moment);
static PARSER(parse_lambda);
static PARSER(parse_lang_def);
static PARSER(parse_mutexed);
static PARSER(parse_namespace);
static PARSER(parse_negative);
static PARSER(parse_not);
@ -616,15 +615,6 @@ type_ast_t *parse_array_type(parse_ctx_t *ctx, const char *pos) {
return NewTypeAST(ctx->file, start, pos, ArrayTypeAST, .item=type);
}
type_ast_t *parse_channel_type(parse_ctx_t *ctx, const char *pos) {
const char *start = pos;
if (!match(&pos, "|")) return NULL;
type_ast_t *type = expect(ctx, start, &pos, parse_type,
"I couldn't parse a channel item type after this point");
expect_closing(ctx, &pos, "|", "I wasn't able to parse the rest of this channel");
return NewTypeAST(ctx->file, start, pos, ChannelTypeAST, .item=type);
}
type_ast_t *parse_pointer_type(parse_ctx_t *ctx, const char *pos) {
const char *start = pos;
bool is_stack;
@ -667,7 +657,6 @@ type_ast_t *parse_non_optional_type(parse_ctx_t *ctx, const char *pos) {
bool success = (false
|| (type=parse_pointer_type(ctx, pos))
|| (type=parse_array_type(ctx, pos))
|| (type=parse_channel_type(ctx, pos))
|| (type=parse_table_type(ctx, pos))
|| (type=parse_set_type(ctx, pos))
|| (type=parse_type_name(ctx, pos))
@ -751,21 +740,6 @@ static INLINE bool match_separator(const char **pos) { // Either comma or newlin
}
}
PARSER(parse_channel) {
const char *start = pos;
if (!match(&pos, "|:")) return NULL;
type_ast_t *item_type = expect(ctx, pos-1, &pos, parse_type, "I couldn't parse a type for this channel");
ast_t *max_size = NULL;
if (match(&pos, ";")) {
whitespace(&pos);
const char *attr_start = pos;
if (match_word(&pos, "max_size") && match(&pos, "="))
max_size = expect(ctx, attr_start, &pos, parse_int, "I expected a maximum size for this channel");
}
expect_closing(ctx, &pos, "|", "I wasn't able to parse the rest of this channel");
return NewAST(ctx->file, start, pos, Channel, .item_type=item_type, .max_size=max_size);
}
PARSER(parse_array) {
const char *start = pos;
if (!match(&pos, "[")) return NULL;
@ -1289,6 +1263,14 @@ PARSER(parse_stack_reference) {
return ast;
}
PARSER(parse_mutexed) {
const char *start = pos;
if (!match_word(&pos, "mutexed")) return NULL;
spaces(&pos);
ast_t *val = expect(ctx, start, &pos, parse_term, "I expected an expression for this 'mutexed'");
return NewAST(ctx->file, start, pos, Mutexed, .value=val);
}
PARSER(parse_not) {
const char *start = pos;
if (!match_word(&pos, "not")) return NULL;
@ -1633,7 +1615,6 @@ PARSER(parse_term_no_suffix) {
|| (term=parse_deserialize(ctx, pos))
|| (term=parse_var(ctx, pos))
|| (term=parse_array(ctx, pos))
|| (term=parse_channel(ctx, pos))
|| (term=parse_reduction(ctx, pos))
|| (term=parse_pass(ctx, pos))
|| (term=parse_defer(ctx, pos))
@ -1641,6 +1622,7 @@ PARSER(parse_term_no_suffix) {
|| (term=parse_stop(ctx, pos))
|| (term=parse_return(ctx, pos))
|| (term=parse_not(ctx, pos))
|| (term=parse_mutexed(ctx, pos))
|| (term=parse_extern(ctx, pos))
|| (term=parse_inline_c(ctx, pos))
);

View File

@ -19,7 +19,6 @@ some common functionality.
- Arrays: [header](stdlib/arrays.h), [implementation](stdlib/arrays.c)
- Bools: [header](stdlib/bools.h), [implementation](stdlib/bools.c)
- Bytes: [header](stdlib/bytes.h), [implementation](stdlib/bytes.c)
- Channels: [header](stdlib/channels.h), [implementation](stdlib/channels.c)
- C Strings: [header](stdlib/c_strings.h), [implementation](stdlib/c_strings.c)
- Files (used internally only): [header](stdlib/files.h), [implementation](stdlib/files.c)
- Functiontype: [header](stdlib/functiontype.h), [implementation](stdlib/functiontype.c)

View File

@ -1,157 +0,0 @@
// Functions that operate on channels
#include <ctype.h>
#include <err.h>
#include <gc.h>
#include <math.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/param.h>
#include "arrays.h"
#include "metamethods.h"
#include "integers.h"
#include "siphash.h"
#include "text.h"
#include "types.h"
#include "util.h"
public Channel_t *Channel$new(Int_t max_size)
{
if (Int$compare_value(max_size, I_small(0)) <= 0)
fail("Cannot create a channel with a size less than one: %ld", max_size);
Channel_t *channel = new(Channel_t);
channel->items = (Array_t){};
channel->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
channel->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
channel->max_size = Int_to_Int64(max_size, false);
return channel;
}
public void Channel$give(Channel_t *channel, const void *item, bool front, int64_t padded_item_size)
{
(void)pthread_mutex_lock(&channel->mutex);
while (channel->items.length >= channel->max_size)
pthread_cond_wait(&channel->cond, &channel->mutex);
Int_t index = front ? I_small(1) : I_small(0);
Array$insert(&channel->items, item, index, padded_item_size);
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
}
public void Channel$give_all(Channel_t *channel, Array_t to_give, bool front, int64_t padded_item_size)
{
if (to_give.length == 0) return;
(void)pthread_mutex_lock(&channel->mutex);
Int_t index = front ? I_small(1) : I_small(0);
if (channel->items.length + to_give.length >= channel->max_size) {
for (int64_t i = 0; i < to_give.length; i++) {
while (channel->items.length >= channel->max_size)
pthread_cond_wait(&channel->cond, &channel->mutex);
Array$insert(&channel->items, to_give.data + i*to_give.stride, index, padded_item_size);
}
} else {
Array$insert_all(&channel->items, to_give, index, padded_item_size);
}
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
}
public void Channel$get(Channel_t *channel, void *out, bool front, int64_t item_size, int64_t padded_item_size)
{
(void)pthread_mutex_lock(&channel->mutex);
while (channel->items.length == 0)
pthread_cond_wait(&channel->cond, &channel->mutex);
memcpy(out, channel->items.data + channel->items.stride * (front ? 0 : channel->items.length-1), (size_t)(item_size));
Int_t index = front ? I_small(1) : Int64_to_Int(channel->items.length);
Array$remove_at(&channel->items, index, I_small(1), padded_item_size);
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
}
public void Channel$peek(Channel_t *channel, void *out, bool front, int64_t item_size)
{
(void)pthread_mutex_lock(&channel->mutex);
while (channel->items.length == 0)
pthread_cond_wait(&channel->cond, &channel->mutex);
int64_t index = front ? 0 : channel->items.length-1;
memcpy(out, channel->items.data + channel->items.stride*index, (size_t)(item_size));
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
}
public Array_t Channel$view(Channel_t *channel)
{
(void)pthread_mutex_lock(&channel->mutex);
ARRAY_INCREF(channel->items);
Array_t ret = channel->items;
(void)pthread_mutex_unlock(&channel->mutex);
return ret;
}
public void Channel$clear(Channel_t *channel)
{
(void)pthread_mutex_lock(&channel->mutex);
Array$clear(&channel->items);
(void)pthread_mutex_unlock(&channel->mutex);
(void)pthread_cond_signal(&channel->cond);
}
PUREFUNC public uint64_t Channel$hash(const void *channel, const TypeInfo_t *type)
{
(void)type;
return siphash24(*(void**)channel, sizeof(Channel_t*));
}
PUREFUNC public int32_t Channel$compare(const void *x, const void *y, const TypeInfo_t*)
{
return (*(Channel_t**)x > *(Channel_t**)y) - (*(Channel_t**)x < *(Channel_t**)y);
}
PUREFUNC public bool Channel$equal(const void *x, const void *y, const TypeInfo_t*)
{
return (*(void**)x == *(void**)y);
}
public Text_t Channel$as_text(const void *channel, bool colorize, const TypeInfo_t *type)
{
const TypeInfo_t *item_type = type->ChannelInfo.item;
if (!channel) {
Text_t typename = generic_as_text(NULL, false, item_type);
return Text$concat(colorize ? Text("\x1b[34;1m|:") : Text("|:"), typename, colorize ? Text("|\x1b[m") : Text("|"));
}
Text_t typename = generic_as_text(NULL, false, item_type);
return Text$concat(
colorize ? Text("\x1b[34;1m|:") : Text("|:"),
typename,
Text("|<"),
Int64$hex(*(int64_t*)channel, I_small(0), true, true),
colorize ? Text(">\x1b[m") : Text(">")
);
}
public PUREFUNC bool Channel$is_none(const void *obj, const TypeInfo_t*)
{
return *(void**)obj == NULL;
}
public void Channel$serialize(const void *obj, FILE *out, Table_t *pointers, const TypeInfo_t *type)
{
Channel_t *channel = (Channel_t*)obj;
Array$serialize(&channel->items, out, pointers, Array$info(type->ChannelInfo.item));
Int64$serialize(&channel->max_size, out, pointers, &Int64$info);
}
public void Channel$deserialize(FILE *in, void *outval, Array_t *pointers, const TypeInfo_t *type)
{
Channel_t *channel = new(Channel_t);
channel->mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
channel->cond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
Array$deserialize(in, &channel->items, pointers, Array$info(type->ChannelInfo.item));
Int64$deserialize(in, &channel->max_size, pointers, &Int64$info);
*(Channel_t**)outval = channel;
}
// vim: ts=4 sw=0 et cino=L2,l1,(0,W4,m1,\:0

View File

@ -1,45 +0,0 @@
#pragma once
// Functions that operate on channels (thread-safe arrays)
#include <stdbool.h>
#include "datatypes.h"
#include "types.h"
#include "util.h"
Channel_t *Channel$new(Int_t max_size);
void Channel$give(Channel_t *channel, const void *item, bool front, int64_t padded_item_size);
#define Channel$give_value(channel, item, front, padded_item_size) \
({ __typeof(item) _item = item; Channel$give(channel, &_item, front, padded_item_size); })
void Channel$give_all(Channel_t *channel, Array_t to_give, bool front, int64_t padded_item_size);
void Channel$get(Channel_t *channel, void *out, bool front, int64_t item_size, int64_t padded_item_size);
#define Channel$get_value(channel, front, t, padded_item_size) \
({ t _val; Channel$get(channel, &_val, front, sizeof(t), padded_item_size); _val; })
void Channel$peek(Channel_t *channel, void *out, bool front, int64_t item_size);
#define Channel$peek_value(channel, front, t) ({ t _val; Channel$peek(channel, &_val, front, sizeof(t)); _val; })
void Channel$clear(Channel_t *channel);
Array_t Channel$view(Channel_t *channel);
PUREFUNC uint64_t Channel$hash(const void *channel, const TypeInfo_t *type);
PUREFUNC int32_t Channel$compare(const void *x, const void *y, const TypeInfo_t *type);
PUREFUNC bool Channel$equal(const void *x, const void *y, const TypeInfo_t *type);
Text_t Channel$as_text(const void *channel, bool colorize, const TypeInfo_t *type);
PUREFUNC bool Channel$is_none(const void *obj, const TypeInfo_t*);
void Channel$serialize(const void *obj, FILE *out, Table_t *pointers, const TypeInfo_t*);
void Channel$deserialize(FILE *in, void *outval, Array_t *pointers, const TypeInfo_t*);
#define Channel$metamethods ((metamethods_t){ \
.as_text=Channel$as_text, \
.compare=Channel$compare, \
.equal=Channel$equal, \
.hash=Channel$hash, \
.is_none=Channel$is_none, \
.serialize=Channel$serialize, \
.deserialize=Channel$deserialize, \
})
#define Channel$info(item_info) &((TypeInfo_t){.size=sizeof(Channel_t), .align=__alignof__(Channel_t), \
.tag=ChannelInfo, .ChannelInfo.item=item_info, \
.metamethods=Channel$metamethods})
// vim: ts=4 sw=0 et cino=L2,l1,(0,W4,m1,\:0

View File

@ -66,13 +66,6 @@ typedef struct Range_s {
Int_t first, last, step;
} Range_t;
typedef struct {
Array_t items;
pthread_mutex_t mutex;
pthread_cond_t cond;
int64_t max_size;
} Channel_t;
enum text_type { TEXT_SHORT_ASCII, TEXT_ASCII, TEXT_SHORT_GRAPHEMES, TEXT_GRAPHEMES, TEXT_SUBTEXT };
typedef struct Text_s {

View File

@ -5,7 +5,6 @@
#include "arrays.h"
#include "bools.h"
#include "channels.h"
#include "functiontype.h"
#include "integers.h"
#include "metamethods.h"

View File

@ -6,7 +6,6 @@
#include "arrays.h"
#include "bools.h"
#include "bytes.h"
#include "channels.h"
#include "functiontype.h"
#include "integers.h"
#include "metamethods.h"

View File

@ -621,4 +621,34 @@ public void sleep_num(double seconds)
nanosleep(&ts, NULL);
}
static void use_mutexed(Closure_t fn, void *userdata)
{
void (*call)(void*, void*) = fn.fn;
struct data_t {
pthread_mutex_t mutex;
char item[0] __attribute__ ((aligned (8)));
};
pthread_mutex_t *mutex = &((struct data_t*)userdata)->mutex;
pthread_mutex_lock(mutex);
void *mutexed_item = (void*)((struct data_t*)userdata)->item;
call(mutexed_item, fn.userdata);
pthread_mutex_unlock(mutex);
}
public Closure_t _mutexed(const void *item, size_t size)
{
struct data_t {
pthread_mutex_t mutex;
char item[size] __attribute__ ((aligned (8)));
};
struct data_t *userdata = GC_MALLOC(sizeof(struct data_t));
pthread_mutex_init(&userdata->mutex, NULL);
memcpy(userdata->item, item, size);
return (Closure_t){
.fn=(void*)use_mutexed,
.userdata=(void*)userdata,
};
}
// vim: ts=4 sw=0 et cino=L2,l1,(0,W4,m1,\:0

View File

@ -42,5 +42,7 @@ Closure_t spawn(Closure_t fn);
bool pop_flag(char **argv, int *i, const char *flag, Text_t *result);
void print_stack_trace(FILE *out, int start, int stop);
void sleep_num(double seconds);
public Closure_t _mutexed(const void *item, size_t size);
#define mutexed(item) _mutexed((__typeof(item)[1]){item}, sizeof(item))
// vim: ts=4 sw=0 et cino=L2,l1,(0,W4,m1,\:0

View File

@ -5,7 +5,6 @@
#include "arrays.h"
#include "bools.h"
#include "channels.h"
#include "functiontype.h"
#include "metamethods.h"
#include "optionals.h"

View File

@ -11,7 +11,6 @@
#include "bools.h"
#include "bytes.h"
#include "c_strings.h"
#include "channels.h"
#include "datatypes.h"
#include "enums.h"
#include "functiontype.h"

View File

@ -29,7 +29,7 @@ struct TypeInfo_s {
int64_t size, align;
metamethods_t metamethods;
struct { // Anonymous tagged union for convenience
enum { OpaqueInfo, StructInfo, EnumInfo, PointerInfo, TextInfo, ArrayInfo, ChannelInfo, TableInfo, FunctionInfo,
enum { OpaqueInfo, StructInfo, EnumInfo, PointerInfo, TextInfo, ArrayInfo, TableInfo, FunctionInfo,
OptionalInfo, TypeInfoInfo } tag;
union {
struct {} OpaqueInfo;
@ -42,7 +42,7 @@ struct TypeInfo_s {
} TextInfo;
struct {
const TypeInfo_t *item;
} ArrayInfo, ChannelInfo;
} ArrayInfo;
struct {
const TypeInfo_t *key, *value;
} TableInfo;

View File

@ -61,12 +61,6 @@ func maybe_c_string(should_i:Bool->CString?):
else:
return none
func maybe_channel(should_i:Bool->|Int|?):
if should_i:
return |:Int|?
else:
return none
func maybe_thread(should_i:Bool->Thread?):
if should_i:
return Thread.new(func(): pass)
@ -244,19 +238,6 @@ func main():
fail("Truthy: $nope")
else: !! Falsey: $nope
do:
!! ...
!! Channels:
>> yep := maybe_channel(yes)
# No "=" test here because channels use addresses in the text version
>> nope := maybe_channel(no)
= none : |:Int|?
>> if yep: >> yep
else: fail("Falsey: $yep")
>> if nope:
fail("Truthy: $nope")
else: !! Falsey: $nope
do:
!! ...
!! Threads:

View File

@ -1,66 +1,74 @@
enum Job(Increment(x:Int), Decrement(x:Int))
func main():
do:
>> channel := |:Int|
>> channel:give(10)
>> channel:give(20)
>> channel:give(30)
>> channel:view()
= [10, 20, 30]
>> channel:peek()
= 10
>> channel:peek(front=no)
= 30
>> channel:give(-10, front=yes)
>> channel:view()
= [-10, 10, 20, 30]
>> with_nums := mutexed [10, 20, 30]
with_nums(func(nums:&[Int]):
>> nums[]
= [10, 20, 30]
nums:insert(40)
)
with_nums(func(nums:&[Int]):
>> nums[]
= [10, 20, 30, 40]
)
jobs := |:Job; max_size=2|
>> jobs:give(Increment(5))
>> jobs:peek()
= Job.Increment(5)
with_jobs := mutexed [Job.Increment(5)]
enqueue_job := func(job:Job):
with_jobs(func(jobs:&[Job]): jobs:insert(job))
dequeue_job := func():
job := @none:Job
with_jobs(func(jobs:&[Job]): job[] = jobs:pop(1))
job[]
with_results := mutexed [:Int]
enqueue_result := func(result:Int):
with_results(func(results:&[Int]): results:insert(result))
dequeue_result := func():
result := @none:Int
repeat:
with_results(func(results:&[Int]): result[] = results:pop(1))
stop if result[]
sleep(0.00001)
result[]!
results := |:Int; max_size|
>> thread := Thread.new(func():
!! In another thread!
repeat:
>> got := jobs:get()
when got is Increment(x):
>> results:give(x+1)
job := dequeue_job() or stop
when job is Increment(x):
enqueue_result(x + 1)
is Decrement(x):
>> results:give(x-1)
enqueue_result(x - 1)
)
>> jobs:give(Decrement(100))
>> jobs:give(Decrement(100))
>> jobs:give(Decrement(100))
>> jobs:give(Decrement(100))
>> jobs:give(Decrement(100))
>> jobs:give(Decrement(100))
enqueue_job(Decrement(100))
enqueue_job(Decrement(200))
enqueue_job(Decrement(300))
enqueue_job(Decrement(400))
enqueue_job(Decrement(500))
enqueue_job(Decrement(600))
>> results:get()
>> enqueue_job(Increment(1000))
>> dequeue_result()
= 6
>> jobs:give(Increment(1000))
>> results:get()
>> dequeue_result()
= 99
>> results:get()
= 99
>> results:get()
= 99
>> results:get()
= 99
>> results:get()
= 99
>> results:get()
= 99
>> dequeue_result()
= 199
>> dequeue_result()
= 299
>> dequeue_result()
= 399
>> dequeue_result()
= 499
>> dequeue_result()
= 599
>> results:get()
>> dequeue_result()
= 1001
!! Canceling...

View File

@ -71,17 +71,6 @@ type_t *parse_type_ast(env_t *env, type_ast_t *ast)
type_size(item_t), ARRAY_MAX_STRIDE);
return Type(SetType, .item_type=item_t);
}
case ChannelTypeAST: {
type_ast_t *item_type = Match(ast, ChannelTypeAST)->item;
type_t *item_t = parse_type_ast(env, item_type);
if (!item_t) code_err(item_type, "I can't figure out what this type is.");
if (!can_send_over_channel(item_t))
code_err(ast, "This item type can't be sent over a channel because it contains reference to memory that may not be thread-safe.");
if (type_size(item_t) > ARRAY_MAX_STRIDE)
code_err(ast, "This channel holds items that take up %ld bytes, but the maximum supported size is %ld bytes.",
type_size(item_t), ARRAY_MAX_STRIDE);
return Type(ChannelType, .item_type=item_t);
}
case TableTypeAST: {
auto table_type = Match(ast, TableTypeAST);
type_ast_t *key_type_ast = table_type->key;
@ -672,11 +661,6 @@ type_t *get_type(env_t *env, ast_t *ast)
code_err(ast, "Sets cannot hold stack references because the set may outlive the reference's stack frame.");
return Type(SetType, .item_type=item_type);
}
case Channel: {
auto channel = Match(ast, Channel);
type_t *item_type = parse_type_ast(env, channel->item_type);
return Type(ChannelType, .item_type=item_type);
}
case Table: {
auto table = Match(ast, Table);
type_t *key_type = NULL, *value_type = NULL;
@ -787,7 +771,6 @@ type_t *get_type(env_t *env, ast_t *ast)
}
case FunctionCall: {
auto call = Match(ast, FunctionCall);
type_t *fn_type_t = get_type(env, call->fn);
if (!fn_type_t)
code_err(call->fn, "I couldn't find this function");
@ -858,15 +841,6 @@ type_t *get_type(env_t *env, ast_t *ast)
else if (streq(call->name, "without")) return self_value_t;
else code_err(ast, "There is no '%s' method for sets", call->name);
}
case ChannelType: {
if (streq(call->name, "clear")) return Type(VoidType);
else if (streq(call->name, "get")) return Match(self_value_t, ChannelType)->item_type;
else if (streq(call->name, "give")) return Type(VoidType);
else if (streq(call->name, "give_all")) return Type(VoidType);
else if (streq(call->name, "peek")) return Match(self_value_t, ChannelType)->item_type;
else if (streq(call->name, "view")) return Type(ArrayType, .item_type=Match(self_value_t, ChannelType)->item_type);
else code_err(ast, "There is no '%s' method for arrays", call->name);
}
case TableType: {
auto table = Match(self_value_t, TableType);
if (streq(call->name, "bump")) return Type(VoidType);
@ -976,6 +950,12 @@ type_t *get_type(env_t *env, ast_t *ast)
}
code_err(ast, "I only know how to get 'not' of boolean, numeric, and optional pointer types, not %T", t);
}
case Mutexed: {
type_t *item_type = get_type(env, Match(ast, Mutexed)->value);
type_t *user_thunk = Type(ClosureType, Type(FunctionType, .args=new(arg_t, .name="object", .type=Type(PointerType, .pointed=item_type, .is_stack=true)),
.ret=Type(VoidType)));
return Type(ClosureType, Type(FunctionType, .args=new(arg_t, .name="fn", .type=user_thunk), .ret=Type(VoidType)));
}
case BinaryOp: {
auto binop = Match(ast, BinaryOp);
type_t *lhs_t = get_type(env, binop->lhs),

37
types.c
View File

@ -37,10 +37,6 @@ CORD type_to_cord(type_t *t) {
auto array = Match(t, ArrayType);
return CORD_asprintf("[%r]", type_to_cord(array->item_type));
}
case ChannelType: {
auto array = Match(t, ChannelType);
return CORD_asprintf("||%r", type_to_cord(array->item_type));
}
case TableType: {
auto table = Match(t, TableType);
if (table->default_value)
@ -245,7 +241,6 @@ PUREFUNC bool has_heap_memory(type_t *t)
{
switch (t->tag) {
case ArrayType: return true;
case ChannelType: return true;
case TableType: return true;
case SetType: return true;
case PointerType: return true;
@ -269,32 +264,6 @@ PUREFUNC bool has_heap_memory(type_t *t)
}
}
PUREFUNC bool can_send_over_channel(type_t *t)
{
switch (t->tag) {
case ArrayType: return true;
case ChannelType: return true;
case TableType: return true;
case PointerType: return false;
case OptionalType: return can_send_over_channel(Match(t, OptionalType)->type);
case StructType: {
for (arg_t *field = Match(t, StructType)->fields; field; field = field->next) {
if (!can_send_over_channel(field->type))
return false;
}
return true;
}
case EnumType: {
for (tag_t *tag = Match(t, EnumType)->tags; tag; tag = tag->next) {
if (tag->type && !can_send_over_channel(tag->type))
return false;
}
return true;
}
default: return true;
}
}
PUREFUNC bool has_stack_memory(type_t *t)
{
switch (t->tag) {
@ -523,7 +492,6 @@ PUREFUNC size_t type_size(type_t *t)
case TextType: return sizeof(Text_t);
case ArrayType: return sizeof(Array_t);
case SetType: return sizeof(Table_t);
case ChannelType: return sizeof(Channel_t*);
case TableType: return sizeof(Table_t);
case FunctionType: return sizeof(void*);
case ClosureType: return sizeof(struct {void *fn, *userdata;});
@ -607,7 +575,6 @@ PUREFUNC size_t type_align(type_t *t)
case TextType: return __alignof__(Text_t);
case SetType: return __alignof__(Table_t);
case ArrayType: return __alignof__(Array_t);
case ChannelType: return __alignof__(Channel_t*);
case TableType: return __alignof__(Table_t);
case FunctionType: return __alignof__(void*);
case ClosureType: return __alignof__(struct {void *fn, *userdata;});
@ -706,10 +673,6 @@ type_t *get_field_type(type_t *t, const char *field_name)
else if (streq(field_name, "microseconds")) return Type(IntType, .bits=TYPE_IBITS64);
return NULL;
}
case ChannelType: {
if (streq(field_name, "max_size")) return INT_TYPE;
return NULL;
}
default: return NULL;
}
}

View File

@ -51,7 +51,6 @@ struct type_s {
MomentType,
TextType,
ArrayType,
ChannelType,
SetType,
TableType,
FunctionType,
@ -85,7 +84,7 @@ struct type_s {
} TextType;
struct {
type_t *item_type;
} ArrayType, ChannelType;
} ArrayType;
struct {
type_t *item_type;
} SetType;
@ -145,7 +144,6 @@ typedef enum {NUM_PRECISION_EQUAL, NUM_PRECISION_LESS, NUM_PRECISION_MORE, NUM_P
PUREFUNC precision_cmp_e compare_precision(type_t *a, type_t *b);
PUREFUNC bool has_heap_memory(type_t *t);
PUREFUNC bool has_stack_memory(type_t *t);
PUREFUNC bool can_send_over_channel(type_t *t);
PUREFUNC bool can_promote(type_t *actual, type_t *needed);
PUREFUNC const char *enum_single_value_tag(type_t *enum_type, type_t *t);
PUREFUNC bool is_int_type(type_t *t);