mirror of
https://github.com/ruby/ruby.git
synced 2026-01-27 04:24:23 +00:00
If we malloc when the current Ractor is locked, we can deadlock because
GC requires VM lock and Ractor barrier. If another Ractor is waiting on
this Ractor lock, then it will deadlock because the other Ractor will
never join the barrier.
For example, this script deadlocks:
r = Ractor.new do
loop do
Ractor::Port.new
end
end
100000.times do |i|
r.send(nil)
puts i
end
On debug builds, it fails with this assertion error:
vm_sync.c:75: Assertion Failed: vm_lock_enter:cr->sync.locked_by != rb_ractor_self(cr)
On non-debug builds, we can see that it deadlocks in the debugger:
Main Ractor:
frame #3: 0x000000010021fdc4 miniruby`rb_native_mutex_lock(lock=<unavailable>) at thread_pthread.c:115:14
frame #4: 0x0000000100193eb8 miniruby`ractor_send0 [inlined] ractor_lock(r=<unavailable>, file=<unavailable>, line=1180) at ractor.c:73:5
frame #5: 0x0000000100193eb0 miniruby`ractor_send0 [inlined] ractor_send_basket(ec=<unavailable>, rp=0x0000000131092840, b=0x000000011c63de80, raise_on_error=true) at ractor_sync.c:1180:5
frame #6: 0x0000000100193eac miniruby`ractor_send0(ec=<unavailable>, rp=0x0000000131092840, obj=4, move=<unavailable>, raise_on_error=true) at ractor_sync.c:1211:5
Second Ractor:
frame #2: 0x00000001002208d0 miniruby`rb_ractor_sched_barrier_start [inlined] rb_native_cond_wait(cond=<unavailable>, mutex=<unavailable>) at thread_pthread.c:221:13
frame #3: 0x00000001002208cc miniruby`rb_ractor_sched_barrier_start(vm=0x000000013180d600, cr=0x0000000131093460) at thread_pthread.c:1438:13
frame #4: 0x000000010028a328 miniruby`rb_vm_barrier at vm_sync.c:262:13 [artificial]
frame #5: 0x00000001000dfa6c miniruby`gc_start [inlined] rb_gc_vm_barrier at gc.c:179:5
frame #6: 0x00000001000dfa68 miniruby`gc_start [inlined] gc_enter(objspace=0x000000013180fc00, event=gc_enter_event_start, lock_lev=<unavailable>) at default.c:6636:9
frame #7: 0x00000001000dfa48 miniruby`gc_start(objspace=0x000000013180fc00, reason=<unavailable>) at default.c:6361:5
frame #8: 0x00000001000e3fd8 miniruby`objspace_malloc_increase_body [inlined] garbage_collect(objspace=0x000000013180fc00, reason=512) at default.c:6341:15
frame #9: 0x00000001000e3fa4 miniruby`objspace_malloc_increase_body [inlined] garbage_collect_with_gvl(objspace=0x000000013180fc00, reason=512) at default.c:6741:16
frame #10: 0x00000001000e3f88 miniruby`objspace_malloc_increase_body(objspace=0x000000013180fc00, mem=<unavailable>, new_size=<unavailable>, old_size=<unavailable>, type=<unavailable>) at default.c:8007:13
frame #11: 0x00000001000e3c44 miniruby`rb_gc_impl_malloc [inlined] objspace_malloc_fixup(objspace=0x000000013180fc00, mem=0x000000011c700000, size=12582912) at default.c:8085:5
frame #12: 0x00000001000e3c30 miniruby`rb_gc_impl_malloc(objspace_ptr=0x000000013180fc00, size=12582912) at default.c:8182:12
frame #13: 0x00000001000d4584 miniruby`ruby_xmalloc [inlined] ruby_xmalloc_body(size=<unavailable>) at gc.c:5128:12
frame #14: 0x00000001000d4568 miniruby`ruby_xmalloc(size=<unavailable>) at gc.c:5118:34
frame #15: 0x00000001001eb184 miniruby`rb_st_init_existing_table_with_size(tab=0x000000011c2b4b40, type=<unavailable>, size=<unavailable>) at st.c:559:39
frame #16: 0x00000001001ebc74 miniruby`rebuild_table_if_necessary [inlined] rb_st_init_table_with_size(type=0x00000001004f4a78, size=524287) at st.c:585:5
frame #17: 0x00000001001ebc5c miniruby`rebuild_table_if_necessary [inlined] rebuild_table(tab=0x000000013108e2f0) at st.c:753:19
frame #18: 0x00000001001ebbfc miniruby`rebuild_table_if_necessary(tab=0x000000013108e2f0) at st.c:1125:9
frame #19: 0x00000001001eba08 miniruby`rb_st_insert(tab=0x000000013108e2f0, key=262144, value=4767566624) at st.c:1143:5
frame #20: 0x0000000100194b84 miniruby`ractor_port_initialzie [inlined] ractor_add_port(r=0x0000000131093460, id=262144) at ractor_sync.c:399:9
frame #21: 0x0000000100194b58 miniruby`ractor_port_initialzie [inlined] ractor_port_init(rpv=4750065560, r=0x0000000131093460) at ractor_sync.c:87:5
frame #22: 0x0000000100194b34 miniruby`ractor_port_initialzie(self=4750065560) at ractor_sync.c:103:12
1507 lines
34 KiB
C
1507 lines
34 KiB
C
|
|
// this file is included by ractor.c
|
|
|
|
struct ractor_port {
|
|
rb_ractor_t *r;
|
|
st_data_t id_;
|
|
};
|
|
|
|
static st_data_t
|
|
ractor_port_id(const struct ractor_port *rp)
|
|
{
|
|
return rp->id_;
|
|
}
|
|
|
|
static VALUE rb_cRactorPort;
|
|
|
|
static VALUE ractor_receive(rb_execution_context_t *ec, const struct ractor_port *rp);
|
|
static VALUE ractor_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move);
|
|
static VALUE ractor_try_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move);
|
|
static void ractor_add_port(rb_ractor_t *r, st_data_t id);
|
|
|
|
static void
|
|
ractor_port_mark(void *ptr)
|
|
{
|
|
const struct ractor_port *rp = (struct ractor_port *)ptr;
|
|
|
|
if (rp->r) {
|
|
rb_gc_mark(rp->r->pub.self);
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_port_free(void *ptr)
|
|
{
|
|
xfree(ptr);
|
|
}
|
|
|
|
static size_t
|
|
ractor_port_memsize(const void *ptr)
|
|
{
|
|
return sizeof(struct ractor_port);
|
|
}
|
|
|
|
static const rb_data_type_t ractor_port_data_type = {
|
|
"ractor/port",
|
|
{
|
|
ractor_port_mark,
|
|
ractor_port_free,
|
|
ractor_port_memsize,
|
|
NULL, // update
|
|
},
|
|
0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED,
|
|
};
|
|
|
|
static st_data_t
|
|
ractor_genid_for_port(rb_ractor_t *cr)
|
|
{
|
|
// TODO: enough?
|
|
return cr->sync.next_port_id++;
|
|
}
|
|
|
|
static struct ractor_port *
|
|
RACTOR_PORT_PTR(VALUE self)
|
|
{
|
|
VM_ASSERT(rb_typeddata_is_kind_of(self, &ractor_port_data_type));
|
|
struct ractor_port *rp = DATA_PTR(self);
|
|
return rp;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_port_alloc(VALUE klass)
|
|
{
|
|
struct ractor_port *rp;
|
|
VALUE rpv = TypedData_Make_Struct(klass, struct ractor_port, &ractor_port_data_type, rp);
|
|
return rpv;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_port_init(VALUE rpv, rb_ractor_t *r)
|
|
{
|
|
struct ractor_port *rp = RACTOR_PORT_PTR(rpv);
|
|
|
|
rp->r = r;
|
|
RB_OBJ_WRITTEN(rpv, Qundef, r->pub.self);
|
|
rp->id_ = ractor_genid_for_port(r);
|
|
|
|
ractor_add_port(r, ractor_port_id(rp));
|
|
|
|
rb_obj_freeze(rpv);
|
|
|
|
return rpv;
|
|
}
|
|
|
|
/*
|
|
* call-seq:
|
|
* Ractor::Port.new -> new_port
|
|
*
|
|
* Returns a new Ractor::Port object.
|
|
*/
|
|
static VALUE
|
|
ractor_port_initialize(VALUE self)
|
|
{
|
|
return ractor_port_init(self, GET_RACTOR());
|
|
}
|
|
|
|
/* :nodoc: */
|
|
static VALUE
|
|
ractor_port_initialize_copy(VALUE self, VALUE orig)
|
|
{
|
|
struct ractor_port *dst = RACTOR_PORT_PTR(self);
|
|
struct ractor_port *src = RACTOR_PORT_PTR(orig);
|
|
dst->r = src->r;
|
|
RB_OBJ_WRITTEN(self, Qundef, dst->r->pub.self);
|
|
dst->id_ = ractor_port_id(src);
|
|
|
|
return self;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_port_new(rb_ractor_t *r)
|
|
{
|
|
VALUE rpv = ractor_port_alloc(rb_cRactorPort);
|
|
ractor_port_init(rpv, r);
|
|
return rpv;
|
|
}
|
|
|
|
static bool
|
|
ractor_port_p(VALUE self)
|
|
{
|
|
return rb_typeddata_is_kind_of(self, &ractor_port_data_type);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_port_receive(rb_execution_context_t *ec, VALUE self)
|
|
{
|
|
const struct ractor_port *rp = RACTOR_PORT_PTR(self);
|
|
|
|
if (rp->r != rb_ec_ractor_ptr(ec)) {
|
|
rb_raise(rb_eRactorError, "only allowed from the creator Ractor of this port");
|
|
}
|
|
|
|
return ractor_receive(ec, rp);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_port_send(rb_execution_context_t *ec, VALUE self, VALUE obj, VALUE move)
|
|
{
|
|
const struct ractor_port *rp = RACTOR_PORT_PTR(self);
|
|
ractor_send(ec, rp, obj, RTEST(move));
|
|
return self;
|
|
}
|
|
|
|
static bool ractor_closed_port_p(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp);
|
|
static bool ractor_close_port(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp);
|
|
|
|
static VALUE
|
|
ractor_port_closed_p(rb_execution_context_t *ec, VALUE self)
|
|
{
|
|
const struct ractor_port *rp = RACTOR_PORT_PTR(self);
|
|
|
|
if (ractor_closed_port_p(ec, rp->r, rp)) {
|
|
return Qtrue;
|
|
}
|
|
else {
|
|
return Qfalse;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_port_close(rb_execution_context_t *ec, VALUE self)
|
|
{
|
|
const struct ractor_port *rp = RACTOR_PORT_PTR(self);
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
|
|
if (cr != rp->r) {
|
|
rb_raise(rb_eRactorError, "closing port by other ractors is not allowed");
|
|
}
|
|
|
|
ractor_close_port(ec, cr, rp);
|
|
return self;
|
|
}
|
|
|
|
// ractor-internal
|
|
|
|
// ractor-internal - ractor_basket
|
|
|
|
enum ractor_basket_type {
|
|
// basket is empty
|
|
basket_type_none,
|
|
|
|
// value is available
|
|
basket_type_ref,
|
|
basket_type_copy,
|
|
basket_type_move,
|
|
};
|
|
|
|
struct ractor_basket {
|
|
enum ractor_basket_type type;
|
|
VALUE sender;
|
|
st_data_t port_id;
|
|
|
|
struct {
|
|
VALUE v;
|
|
bool exception;
|
|
} p; // payload
|
|
|
|
struct ccan_list_node node;
|
|
};
|
|
|
|
#if 0
|
|
static inline bool
|
|
ractor_basket_type_p(const struct ractor_basket *b, enum ractor_basket_type type)
|
|
{
|
|
return b->type == type;
|
|
}
|
|
|
|
static inline bool
|
|
ractor_basket_none_p(const struct ractor_basket *b)
|
|
{
|
|
return ractor_basket_type_p(b, basket_type_none);
|
|
}
|
|
#endif
|
|
|
|
static void
|
|
ractor_basket_mark(const struct ractor_basket *b)
|
|
{
|
|
rb_gc_mark(b->p.v);
|
|
}
|
|
|
|
static void
|
|
ractor_basket_free(struct ractor_basket *b)
|
|
{
|
|
xfree(b);
|
|
}
|
|
|
|
static struct ractor_basket *
|
|
ractor_basket_alloc(void)
|
|
{
|
|
struct ractor_basket *b = ALLOC(struct ractor_basket);
|
|
return b;
|
|
}
|
|
|
|
// ractor-internal - ractor_queue
|
|
|
|
struct ractor_queue {
|
|
struct ccan_list_head set;
|
|
bool closed;
|
|
};
|
|
|
|
static void
|
|
ractor_queue_init(struct ractor_queue *rq)
|
|
{
|
|
ccan_list_head_init(&rq->set);
|
|
rq->closed = false;
|
|
}
|
|
|
|
static struct ractor_queue *
|
|
ractor_queue_new(void)
|
|
{
|
|
struct ractor_queue *rq = ALLOC(struct ractor_queue);
|
|
ractor_queue_init(rq);
|
|
return rq;
|
|
}
|
|
|
|
static void
|
|
ractor_queue_mark(const struct ractor_queue *rq)
|
|
{
|
|
const struct ractor_basket *b;
|
|
|
|
ccan_list_for_each(&rq->set, b, node) {
|
|
ractor_basket_mark(b);
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_queue_free(struct ractor_queue *rq)
|
|
{
|
|
struct ractor_basket *b, *nxt;
|
|
|
|
ccan_list_for_each_safe(&rq->set, b, nxt, node) {
|
|
ccan_list_del_init(&b->node);
|
|
ractor_basket_free(b);
|
|
}
|
|
|
|
VM_ASSERT(ccan_list_empty(&rq->set));
|
|
|
|
xfree(rq);
|
|
}
|
|
|
|
RBIMPL_ATTR_MAYBE_UNUSED()
|
|
static size_t
|
|
ractor_queue_size(const struct ractor_queue *rq)
|
|
{
|
|
size_t size = 0;
|
|
const struct ractor_basket *b;
|
|
|
|
ccan_list_for_each(&rq->set, b, node) {
|
|
size++;
|
|
}
|
|
return size;
|
|
}
|
|
|
|
static void
|
|
ractor_queue_close(struct ractor_queue *rq)
|
|
{
|
|
rq->closed = true;
|
|
}
|
|
|
|
static void
|
|
ractor_queue_move(struct ractor_queue *dst_rq, struct ractor_queue *src_rq)
|
|
{
|
|
struct ccan_list_head *src = &src_rq->set;
|
|
struct ccan_list_head *dst = &dst_rq->set;
|
|
|
|
dst->n.next = src->n.next;
|
|
dst->n.prev = src->n.prev;
|
|
dst->n.next->prev = &dst->n;
|
|
dst->n.prev->next = &dst->n;
|
|
ccan_list_head_init(src);
|
|
}
|
|
|
|
#if 0
|
|
static struct ractor_basket *
|
|
ractor_queue_head(rb_ractor_t *r, struct ractor_queue *rq)
|
|
{
|
|
return ccan_list_top(&rq->set, struct ractor_basket, node);
|
|
}
|
|
#endif
|
|
|
|
static bool
|
|
ractor_queue_empty_p(rb_ractor_t *r, const struct ractor_queue *rq)
|
|
{
|
|
return ccan_list_empty(&rq->set);
|
|
}
|
|
|
|
static struct ractor_basket *
|
|
ractor_queue_deq(rb_ractor_t *r, struct ractor_queue *rq)
|
|
{
|
|
VM_ASSERT(GET_RACTOR() == r);
|
|
|
|
return ccan_list_pop(&rq->set, struct ractor_basket, node);
|
|
}
|
|
|
|
static void
|
|
ractor_queue_enq(rb_ractor_t *r, struct ractor_queue *rq, struct ractor_basket *basket)
|
|
{
|
|
ccan_list_add_tail(&rq->set, &basket->node);
|
|
}
|
|
|
|
#if 0
|
|
static void
|
|
rq_dump(const struct ractor_queue *rq)
|
|
{
|
|
int i=0;
|
|
struct ractor_basket *b;
|
|
ccan_list_for_each(&rq->set, b, node) {
|
|
fprintf(stderr, "%d type:%s %p\n", i, basket_type_name(b->type), (void *)b);
|
|
i++;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
static void ractor_delete_port(rb_ractor_t *cr, st_data_t id, bool locked);
|
|
|
|
static struct ractor_queue *
|
|
ractor_get_queue(rb_ractor_t *cr, st_data_t id, bool locked)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
|
|
struct ractor_queue *rq;
|
|
|
|
if (cr->sync.ports && st_lookup(cr->sync.ports, id, (st_data_t *)&rq)) {
|
|
if (rq->closed && ractor_queue_empty_p(cr, rq)) {
|
|
ractor_delete_port(cr, id, locked);
|
|
return NULL;
|
|
}
|
|
else {
|
|
return rq;
|
|
}
|
|
}
|
|
else {
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
// ractor-internal - ports
|
|
|
|
static void
|
|
ractor_add_port(rb_ractor_t *r, st_data_t id)
|
|
{
|
|
struct ractor_queue *rq = ractor_queue_new();
|
|
ASSERT_ractor_unlocking(r);
|
|
|
|
RUBY_DEBUG_LOG("id:%u", (unsigned int)id);
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
st_insert(r->sync.ports, id, (st_data_t)rq);
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
}
|
|
|
|
static void
|
|
ractor_delete_port_locked(rb_ractor_t *cr, st_data_t id)
|
|
{
|
|
ASSERT_ractor_locking(cr);
|
|
|
|
RUBY_DEBUG_LOG("id:%u", (unsigned int)id);
|
|
|
|
struct ractor_queue *rq;
|
|
|
|
if (st_delete(cr->sync.ports, &id, (st_data_t *)&rq)) {
|
|
ractor_queue_free(rq);
|
|
}
|
|
else {
|
|
VM_ASSERT(0);
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_delete_port(rb_ractor_t *cr, st_data_t id, bool locked)
|
|
{
|
|
if (locked) {
|
|
ractor_delete_port_locked(cr, id);
|
|
}
|
|
else {
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
ractor_delete_port_locked(cr, id);
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
}
|
|
}
|
|
|
|
static const struct ractor_port *
|
|
ractor_default_port(rb_ractor_t *r)
|
|
{
|
|
return RACTOR_PORT_PTR(r->sync.default_port_value);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_default_port_value(rb_ractor_t *r)
|
|
{
|
|
return r->sync.default_port_value;
|
|
}
|
|
|
|
static bool
|
|
ractor_closed_port_p(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp)
|
|
{
|
|
VM_ASSERT(rb_ec_ractor_ptr(ec) == rp->r ? 1 : (ASSERT_ractor_locking(rp->r), 1));
|
|
|
|
const struct ractor_queue *rq;
|
|
|
|
if (rp->r->sync.ports && st_lookup(rp->r->sync.ports, ractor_port_id(rp), (st_data_t *)&rq)) {
|
|
return rq->closed;
|
|
}
|
|
else {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
static void ractor_deliver_incoming_messages(rb_execution_context_t *ec, rb_ractor_t *cr);
|
|
static bool ractor_queue_empty_p(rb_ractor_t *r, const struct ractor_queue *rq);
|
|
|
|
static bool
|
|
ractor_close_port(rb_execution_context_t *ec, rb_ractor_t *cr, const struct ractor_port *rp)
|
|
{
|
|
VM_ASSERT(cr == rp->r);
|
|
struct ractor_queue *rq = NULL;
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
ractor_deliver_incoming_messages(ec, cr); // check incoming messages
|
|
|
|
if (st_lookup(rp->r->sync.ports, ractor_port_id(rp), (st_data_t *)&rq)) {
|
|
ractor_queue_close(rq);
|
|
|
|
if (ractor_queue_empty_p(cr, rq)) {
|
|
// delete from the table
|
|
ractor_delete_port(cr, ractor_port_id(rp), true);
|
|
}
|
|
|
|
// TODO: free rq
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
return rq != NULL;
|
|
}
|
|
|
|
static int
|
|
ractor_free_all_ports_i(st_data_t port_id, st_data_t val, st_data_t dat)
|
|
{
|
|
struct ractor_queue *rq = (struct ractor_queue *)val;
|
|
// rb_ractor_t *cr = (rb_ractor_t *)dat;
|
|
|
|
ractor_queue_free(rq);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_free_all_ports(rb_ractor_t *cr)
|
|
{
|
|
if (cr->sync.ports) {
|
|
st_foreach(cr->sync.ports, ractor_free_all_ports_i, (st_data_t)cr);
|
|
st_free_table(cr->sync.ports);
|
|
cr->sync.ports = NULL;
|
|
}
|
|
|
|
if (cr->sync.recv_queue) {
|
|
ractor_queue_free(cr->sync.recv_queue);
|
|
cr->sync.recv_queue = NULL;
|
|
}
|
|
}
|
|
|
|
#if defined(HAVE_WORKING_FORK)
|
|
static void
|
|
ractor_sync_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r)
|
|
{
|
|
ractor_free_all_ports(r);
|
|
r->sync.legacy = Qnil;
|
|
}
|
|
#endif
|
|
|
|
// Ractor#monitor
|
|
|
|
struct ractor_monitor {
|
|
struct ractor_port port;
|
|
struct ccan_list_node node;
|
|
};
|
|
|
|
static void
|
|
ractor_mark_monitors(rb_ractor_t *r)
|
|
{
|
|
const struct ractor_monitor *rm;
|
|
ccan_list_for_each(&r->sync.monitors, rm, node) {
|
|
rb_gc_mark(rm->port.r->pub.self);
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_exit_token(bool exc)
|
|
{
|
|
if (exc) {
|
|
RUBY_DEBUG_LOG("aborted");
|
|
return ID2SYM(idAborted);
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("exited");
|
|
return ID2SYM(idExited);
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_monitor(rb_execution_context_t *ec, VALUE self, VALUE port)
|
|
{
|
|
rb_ractor_t *r = RACTOR_PTR(self);
|
|
bool terminated = false;
|
|
const struct ractor_port *rp = RACTOR_PORT_PTR(port);
|
|
struct ractor_monitor *rm = ALLOC(struct ractor_monitor);
|
|
rm->port = *rp; // copy port information
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (UNDEF_P(r->sync.legacy)) { // not terminated
|
|
RUBY_DEBUG_LOG("OK/r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
|
|
ccan_list_add_tail(&r->sync.monitors, &rm->node);
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("NG/r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
|
|
terminated = true;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
if (terminated) {
|
|
xfree(rm);
|
|
ractor_port_send(ec, port, ractor_exit_token(r->sync.legacy_exc), Qfalse);
|
|
|
|
return Qfalse;
|
|
}
|
|
else {
|
|
return Qtrue;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_unmonitor(rb_execution_context_t *ec, VALUE self, VALUE port)
|
|
{
|
|
rb_ractor_t *r = RACTOR_PTR(self);
|
|
const struct ractor_port *rp = RACTOR_PORT_PTR(port);
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (UNDEF_P(r->sync.legacy)) { // not terminated
|
|
struct ractor_monitor *rm, *nxt;
|
|
|
|
ccan_list_for_each_safe(&r->sync.monitors, rm, nxt, node) {
|
|
if (ractor_port_id(&rm->port) == ractor_port_id(rp)) {
|
|
RUBY_DEBUG_LOG("r:%u -> port:%u@r%u",
|
|
(unsigned int)rb_ractor_id(r),
|
|
(unsigned int)ractor_port_id(&rm->port),
|
|
(unsigned int)rb_ractor_id(rm->port.r));
|
|
ccan_list_del(&rm->node);
|
|
xfree(rm);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
return self;
|
|
}
|
|
|
|
static void
|
|
ractor_notify_exit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE legacy, bool exc)
|
|
{
|
|
RUBY_DEBUG_LOG("exc:%d", exc);
|
|
VM_ASSERT(!UNDEF_P(legacy));
|
|
VM_ASSERT(cr->sync.legacy == Qundef);
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
ractor_free_all_ports(cr);
|
|
|
|
cr->sync.legacy = legacy;
|
|
cr->sync.legacy_exc = exc;
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
// send token
|
|
|
|
VALUE token = ractor_exit_token(exc);
|
|
struct ractor_monitor *rm, *nxt;
|
|
|
|
ccan_list_for_each_safe(&cr->sync.monitors, rm, nxt, node)
|
|
{
|
|
RUBY_DEBUG_LOG("port:%u@r%u", (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
|
|
|
|
ractor_try_send(ec, &rm->port, token, false);
|
|
|
|
ccan_list_del(&rm->node);
|
|
xfree(rm);
|
|
}
|
|
|
|
VM_ASSERT(ccan_list_empty(&cr->sync.monitors));
|
|
}
|
|
|
|
// ractor-internal - initialize, mark, free, memsize
|
|
|
|
static int
|
|
ractor_mark_ports_i(st_data_t key, st_data_t val, st_data_t data)
|
|
{
|
|
// id -> ractor_queue
|
|
const struct ractor_queue *rq = (struct ractor_queue *)val;
|
|
ractor_queue_mark(rq);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_sync_mark(rb_ractor_t *r)
|
|
{
|
|
rb_gc_mark(r->sync.default_port_value);
|
|
|
|
if (r->sync.ports) {
|
|
ractor_queue_mark(r->sync.recv_queue);
|
|
st_foreach(r->sync.ports, ractor_mark_ports_i, 0);
|
|
}
|
|
|
|
ractor_mark_monitors(r);
|
|
}
|
|
|
|
static int
|
|
ractor_sync_free_ports_i(st_data_t _key, st_data_t val, st_data_t _args)
|
|
{
|
|
struct ractor_queue *queue = (struct ractor_queue *)val;
|
|
|
|
ractor_queue_free(queue);
|
|
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_sync_free(rb_ractor_t *r)
|
|
{
|
|
if (r->sync.recv_queue) {
|
|
ractor_queue_free(r->sync.recv_queue);
|
|
}
|
|
|
|
// maybe NULL
|
|
if (r->sync.ports) {
|
|
st_foreach(r->sync.ports, ractor_sync_free_ports_i, 0);
|
|
st_free_table(r->sync.ports);
|
|
r->sync.ports = NULL;
|
|
}
|
|
}
|
|
|
|
static size_t
|
|
ractor_sync_memsize(const rb_ractor_t *r)
|
|
{
|
|
return st_table_size(r->sync.ports);
|
|
}
|
|
|
|
static void
|
|
ractor_sync_init(rb_ractor_t *r)
|
|
{
|
|
// lock
|
|
rb_native_mutex_initialize(&r->sync.lock);
|
|
|
|
// monitors
|
|
ccan_list_head_init(&r->sync.monitors);
|
|
|
|
// waiters
|
|
ccan_list_head_init(&r->sync.waiters);
|
|
|
|
// receiving queue
|
|
r->sync.recv_queue = ractor_queue_new();
|
|
|
|
// ports
|
|
r->sync.ports = st_init_numtable();
|
|
r->sync.default_port_value = ractor_port_new(r);
|
|
FL_SET_RAW(r->sync.default_port_value, RUBY_FL_SHAREABLE); // only default ports are shareable
|
|
|
|
// legacy
|
|
r->sync.legacy = Qundef;
|
|
|
|
#ifndef RUBY_THREAD_PTHREAD_H
|
|
rb_native_cond_initialize(&r->sync.wakeup_cond);
|
|
#endif
|
|
}
|
|
|
|
// Ractor#value
|
|
|
|
static rb_ractor_t *
|
|
ractor_set_successor_once(rb_ractor_t *r, rb_ractor_t *cr)
|
|
{
|
|
if (r->sync.successor == NULL) {
|
|
rb_ractor_t *successor = ATOMIC_PTR_CAS(r->sync.successor, NULL, cr);
|
|
return successor == NULL ? cr : successor;
|
|
}
|
|
|
|
return r->sync.successor;
|
|
}
|
|
|
|
static VALUE ractor_reset_belonging(VALUE obj);
|
|
|
|
static VALUE
|
|
ractor_make_remote_exception(VALUE cause, VALUE sender)
|
|
{
|
|
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
|
|
rb_ivar_set(err, rb_intern("@ractor"), sender);
|
|
rb_ec_setup_exception(NULL, err, cause);
|
|
return err;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_value(rb_execution_context_t *ec, VALUE self)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
rb_ractor_t *r = RACTOR_PTR(self);
|
|
rb_ractor_t *sr = ractor_set_successor_once(r, cr);
|
|
|
|
if (sr == cr) {
|
|
ractor_reset_belonging(r->sync.legacy);
|
|
|
|
if (r->sync.legacy_exc) {
|
|
rb_exc_raise(ractor_make_remote_exception(r->sync.legacy, self));
|
|
}
|
|
return r->sync.legacy;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "Only the successor ractor can take a value");
|
|
}
|
|
}
|
|
|
|
static VALUE ractor_move(VALUE obj); // in this file
|
|
static VALUE ractor_copy(VALUE obj); // in this file
|
|
|
|
static VALUE
|
|
ractor_prepare_payload(rb_execution_context_t *ec, VALUE obj, enum ractor_basket_type *ptype)
|
|
{
|
|
switch (*ptype) {
|
|
case basket_type_ref:
|
|
return obj;
|
|
case basket_type_move:
|
|
return ractor_move(obj);
|
|
default:
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
*ptype = basket_type_ref;
|
|
return obj;
|
|
}
|
|
else {
|
|
*ptype = basket_type_copy;
|
|
return ractor_copy(obj);
|
|
}
|
|
}
|
|
}
|
|
|
|
static struct ractor_basket *
|
|
ractor_basket_new(rb_execution_context_t *ec, VALUE obj, enum ractor_basket_type type, bool exc)
|
|
{
|
|
VALUE v = ractor_prepare_payload(ec, obj, &type);
|
|
|
|
struct ractor_basket *b = ractor_basket_alloc();
|
|
b->type = type;
|
|
b->p.v = v;
|
|
b->p.exception = exc;
|
|
return b;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_basket_value(struct ractor_basket *b)
|
|
{
|
|
switch (b->type) {
|
|
case basket_type_ref:
|
|
break;
|
|
case basket_type_copy:
|
|
case basket_type_move:
|
|
ractor_reset_belonging(b->p.v);
|
|
break;
|
|
default:
|
|
VM_ASSERT(0); // unreachable
|
|
}
|
|
|
|
VM_ASSERT(!RB_TYPE_P(b->p.v, T_NONE));
|
|
return b->p.v;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_basket_accept(struct ractor_basket *b)
|
|
{
|
|
VALUE v = ractor_basket_value(b);
|
|
|
|
if (b->p.exception) {
|
|
VALUE err = ractor_make_remote_exception(v, b->sender);
|
|
ractor_basket_free(b);
|
|
rb_exc_raise(err);
|
|
}
|
|
|
|
ractor_basket_free(b);
|
|
return v;
|
|
}
|
|
|
|
// Ractor blocking by receive
|
|
|
|
enum ractor_wakeup_status {
|
|
wakeup_none,
|
|
wakeup_by_send,
|
|
wakeup_by_interrupt,
|
|
|
|
// wakeup_by_close,
|
|
};
|
|
|
|
struct ractor_waiter {
|
|
enum ractor_wakeup_status wakeup_status;
|
|
rb_thread_t *th;
|
|
struct ccan_list_node node;
|
|
};
|
|
|
|
#if VM_CHECK_MODE > 0
|
|
static bool
|
|
ractor_waiter_included(rb_ractor_t *cr, rb_thread_t *th)
|
|
{
|
|
ASSERT_ractor_locking(cr);
|
|
|
|
struct ractor_waiter *w;
|
|
|
|
ccan_list_for_each(&cr->sync.waiters, w, node) {
|
|
if (w->th == th) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
#endif
|
|
|
|
#if USE_RUBY_DEBUG_LOG
|
|
|
|
static const char *
|
|
wakeup_status_str(enum ractor_wakeup_status wakeup_status)
|
|
{
|
|
switch (wakeup_status) {
|
|
case wakeup_none: return "none";
|
|
case wakeup_by_send: return "by_send";
|
|
case wakeup_by_interrupt: return "by_interrupt";
|
|
// case wakeup_by_close: return "by_close";
|
|
}
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
static const char *
|
|
basket_type_name(enum ractor_basket_type type)
|
|
{
|
|
switch (type) {
|
|
case basket_type_none: return "none";
|
|
case basket_type_ref: return "ref";
|
|
case basket_type_copy: return "copy";
|
|
case basket_type_move: return "move";
|
|
}
|
|
VM_ASSERT(0);
|
|
return NULL;
|
|
}
|
|
|
|
#endif // USE_RUBY_DEBUG_LOG
|
|
|
|
#ifdef RUBY_THREAD_PTHREAD_H
|
|
|
|
//
|
|
|
|
#else // win32
|
|
|
|
static void
|
|
ractor_cond_wait(rb_ractor_t *r)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
VALUE locked_by = r->sync.locked_by;
|
|
r->sync.locked_by = Qnil;
|
|
#endif
|
|
rb_native_cond_wait(&r->sync.wakeup_cond, &r->sync.lock);
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
r->sync.locked_by = locked_by;
|
|
#endif
|
|
}
|
|
|
|
static void *
|
|
ractor_wait_no_gvl(void *ptr)
|
|
{
|
|
struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
|
|
rb_ractor_t *cr = waiter->th->ractor;
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
if (waiter->wakeup_status == wakeup_none) {
|
|
ractor_cond_wait(cr);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
return NULL;
|
|
}
|
|
|
|
static void
|
|
rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf, void *ptr)
|
|
{
|
|
struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
|
|
|
|
RACTOR_UNLOCK(cr);
|
|
{
|
|
rb_nogvl(ractor_wait_no_gvl, waiter,
|
|
ubf, waiter,
|
|
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
|
|
}
|
|
RACTOR_LOCK(cr);
|
|
}
|
|
|
|
static void
|
|
rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th)
|
|
{
|
|
// ractor lock is acquired
|
|
rb_native_cond_broadcast(&r->sync.wakeup_cond);
|
|
}
|
|
#endif
|
|
|
|
static bool
|
|
ractor_wakeup_all(rb_ractor_t *r, enum ractor_wakeup_status wakeup_status)
|
|
{
|
|
ASSERT_ractor_unlocking(r);
|
|
|
|
RUBY_DEBUG_LOG("r:%u wakeup:%s", rb_ractor_id(r), wakeup_status_str(wakeup_status));
|
|
|
|
bool wakeup_p = false;
|
|
|
|
RACTOR_LOCK(r);
|
|
while (1) {
|
|
struct ractor_waiter *waiter = ccan_list_pop(&r->sync.waiters, struct ractor_waiter, node);
|
|
|
|
if (waiter) {
|
|
VM_ASSERT(waiter->wakeup_status == wakeup_none);
|
|
|
|
waiter->wakeup_status = wakeup_status;
|
|
rb_ractor_sched_wakeup(r, waiter->th);
|
|
|
|
wakeup_p = true;
|
|
}
|
|
else {
|
|
break;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
return wakeup_p;
|
|
}
|
|
|
|
static void
|
|
ubf_ractor_wait(void *ptr)
|
|
{
|
|
struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
|
|
|
|
rb_thread_t *th = waiter->th;
|
|
rb_ractor_t *r = th->ractor;
|
|
|
|
// clear ubf and nobody can kick UBF
|
|
th->unblock.func = NULL;
|
|
th->unblock.arg = NULL;
|
|
|
|
rb_native_mutex_unlock(&th->interrupt_lock);
|
|
{
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (waiter->wakeup_status == wakeup_none) {
|
|
RUBY_DEBUG_LOG("waiter:%p", (void *)waiter);
|
|
|
|
waiter->wakeup_status = wakeup_by_interrupt;
|
|
ccan_list_del(&waiter->node);
|
|
|
|
rb_ractor_sched_wakeup(r, waiter->th);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
}
|
|
rb_native_mutex_lock(&th->interrupt_lock);
|
|
}
|
|
|
|
static enum ractor_wakeup_status
|
|
ractor_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
|
|
{
|
|
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
|
|
|
struct ractor_waiter waiter = {
|
|
.wakeup_status = wakeup_none,
|
|
.th = th,
|
|
};
|
|
|
|
RUBY_DEBUG_LOG("wait%s", "");
|
|
|
|
ASSERT_ractor_locking(cr);
|
|
|
|
VM_ASSERT(GET_RACTOR() == cr);
|
|
VM_ASSERT(!ractor_waiter_included(cr, th));
|
|
|
|
ccan_list_add_tail(&cr->sync.waiters, &waiter.node);
|
|
|
|
// resume another ready thread and wait for an event
|
|
rb_ractor_sched_wait(ec, cr, ubf_ractor_wait, &waiter);
|
|
|
|
if (waiter.wakeup_status == wakeup_none) {
|
|
ccan_list_del(&waiter.node);
|
|
}
|
|
|
|
RUBY_DEBUG_LOG("wakeup_status:%s", wakeup_status_str(waiter.wakeup_status));
|
|
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
{
|
|
rb_ec_check_ints(ec);
|
|
}
|
|
RACTOR_LOCK_SELF(cr);
|
|
|
|
VM_ASSERT(!ractor_waiter_included(cr, th));
|
|
return waiter.wakeup_status;
|
|
}
|
|
|
|
static void
|
|
ractor_deliver_incoming_messages(rb_execution_context_t *ec, rb_ractor_t *cr)
|
|
{
|
|
ASSERT_ractor_locking(cr);
|
|
struct ractor_queue *recv_q = cr->sync.recv_queue;
|
|
|
|
struct ractor_basket *b;
|
|
while ((b = ractor_queue_deq(cr, recv_q)) != NULL) {
|
|
ractor_queue_enq(cr, ractor_get_queue(cr, b->port_id, true), b);
|
|
}
|
|
}
|
|
|
|
static bool
|
|
ractor_check_received(rb_ractor_t *cr, struct ractor_queue *messages)
|
|
{
|
|
struct ractor_queue *received_queue = cr->sync.recv_queue;
|
|
bool received = false;
|
|
|
|
ASSERT_ractor_locking(cr);
|
|
|
|
if (ractor_queue_empty_p(cr, received_queue)) {
|
|
RUBY_DEBUG_LOG("empty");
|
|
}
|
|
else {
|
|
received = true;
|
|
|
|
// messages <- incoming
|
|
ractor_queue_init(messages);
|
|
ractor_queue_move(messages, received_queue);
|
|
}
|
|
|
|
VM_ASSERT(ractor_queue_empty_p(cr, received_queue));
|
|
|
|
RUBY_DEBUG_LOG("received:%d", received);
|
|
return received;
|
|
}
|
|
|
|
static void
|
|
ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
|
|
{
|
|
struct ractor_queue messages;
|
|
bool deliverred = false;
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
if (ractor_check_received(cr, &messages)) {
|
|
deliverred = true;
|
|
}
|
|
else {
|
|
ractor_wait(ec, cr);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
if (deliverred) {
|
|
VM_ASSERT(!ractor_queue_empty_p(cr, &messages));
|
|
struct ractor_basket *b;
|
|
|
|
while ((b = ractor_queue_deq(cr, &messages)) != NULL) {
|
|
ractor_queue_enq(cr, ractor_get_queue(cr, b->port_id, false), b);
|
|
}
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, const struct ractor_port *rp)
|
|
{
|
|
struct ractor_queue *rq = ractor_get_queue(cr, ractor_port_id(rp), false);
|
|
|
|
if (rq == NULL) {
|
|
rb_raise(rb_eRactorClosedError, "The port was already closed");
|
|
}
|
|
|
|
struct ractor_basket *b = ractor_queue_deq(cr, rq);
|
|
|
|
if (rq->closed && ractor_queue_empty_p(cr, rq)) {
|
|
ractor_delete_port(cr, ractor_port_id(rp), false);
|
|
}
|
|
|
|
if (b) {
|
|
return ractor_basket_accept(b);
|
|
}
|
|
else {
|
|
return Qundef;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_receive(rb_execution_context_t *ec, const struct ractor_port *rp)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
VM_ASSERT(cr == rp->r);
|
|
|
|
RUBY_DEBUG_LOG("port:%u", (unsigned int)ractor_port_id(rp));
|
|
|
|
while (1) {
|
|
VALUE v = ractor_try_receive(ec, cr, rp);
|
|
|
|
if (v != Qundef) {
|
|
return v;
|
|
}
|
|
else {
|
|
ractor_wait_receive(ec, cr);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ractor#send
|
|
|
|
static void
|
|
ractor_send_basket(rb_execution_context_t *ec, const struct ractor_port *rp, struct ractor_basket *b, bool raise_on_error)
|
|
{
|
|
bool closed = false;
|
|
|
|
RUBY_DEBUG_LOG("port:%u@r%u b:%s v:%p", (unsigned int)ractor_port_id(rp), rb_ractor_id(rp->r), basket_type_name(b->type), (void *)b->p.v);
|
|
|
|
RACTOR_LOCK(rp->r);
|
|
{
|
|
if (ractor_closed_port_p(ec, rp->r, rp)) {
|
|
closed = true;
|
|
}
|
|
else {
|
|
b->port_id = ractor_port_id(rp);
|
|
ractor_queue_enq(rp->r, rp->r->sync.recv_queue, b);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(rp->r);
|
|
|
|
// NOTE: ref r -> b->p.v is created, but Ractor is unprotected object, so no problem on that.
|
|
|
|
if (!closed) {
|
|
ractor_wakeup_all(rp->r, wakeup_by_send);
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("closed:%u@r%u", (unsigned int)ractor_port_id(rp), rb_ractor_id(rp->r));
|
|
|
|
if (raise_on_error) {
|
|
ractor_basket_free(b);
|
|
rb_raise(rb_eRactorClosedError, "The port was already closed");
|
|
}
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_send0(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move, bool raise_on_error)
|
|
{
|
|
struct ractor_basket *b = ractor_basket_new(ec, obj, RTEST(move) ? basket_type_move : basket_type_none, false);
|
|
ractor_send_basket(ec, rp, b, raise_on_error);
|
|
RB_GC_GUARD(obj);
|
|
return rp->r->pub.self;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move)
|
|
{
|
|
return ractor_send0(ec, rp, obj, move, true);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_try_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move)
|
|
{
|
|
return ractor_send0(ec, rp, obj, move, false);
|
|
}
|
|
|
|
// Ractor::Selector
|
|
|
|
struct ractor_selector {
|
|
struct st_table *ports; // rpv -> rp
|
|
|
|
};
|
|
|
|
static int
|
|
ractor_selector_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
|
|
{
|
|
rb_gc_mark((VALUE)key); // rpv
|
|
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_selector_mark(void *ptr)
|
|
{
|
|
struct ractor_selector *s = ptr;
|
|
|
|
if (s->ports) {
|
|
st_foreach(s->ports, ractor_selector_mark_i, 0);
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_selector_free(void *ptr)
|
|
{
|
|
struct ractor_selector *s = ptr;
|
|
st_free_table(s->ports);
|
|
ruby_xfree(ptr);
|
|
}
|
|
|
|
static size_t
|
|
ractor_selector_memsize(const void *ptr)
|
|
{
|
|
const struct ractor_selector *s = ptr;
|
|
return sizeof(struct ractor_selector) + st_memsize(s->ports);
|
|
}
|
|
|
|
static const rb_data_type_t ractor_selector_data_type = {
|
|
"ractor/selector",
|
|
{
|
|
ractor_selector_mark,
|
|
ractor_selector_free,
|
|
ractor_selector_memsize,
|
|
NULL, // update
|
|
},
|
|
0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED,
|
|
};
|
|
|
|
static struct ractor_selector *
|
|
RACTOR_SELECTOR_PTR(VALUE selv)
|
|
{
|
|
VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
|
|
return (struct ractor_selector *)DATA_PTR(selv);
|
|
}
|
|
|
|
// Ractor::Selector.new
|
|
|
|
static VALUE
|
|
ractor_selector_create(VALUE klass)
|
|
{
|
|
struct ractor_selector *s;
|
|
VALUE selv = TypedData_Make_Struct(klass, struct ractor_selector, &ractor_selector_data_type, s);
|
|
s->ports = st_init_numtable(); // TODO
|
|
return selv;
|
|
}
|
|
|
|
// Ractor::Selector#add(r)
|
|
|
|
/*
|
|
* call-seq:
|
|
* add(ractor) -> ractor
|
|
*
|
|
* Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added.
|
|
* Returns _ractor_.
|
|
*/
|
|
static VALUE
|
|
ractor_selector_add(VALUE selv, VALUE rpv)
|
|
{
|
|
if (!ractor_port_p(rpv)) {
|
|
rb_raise(rb_eArgError, "Not a Ractor::Port object");
|
|
}
|
|
|
|
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
const struct ractor_port *rp = RACTOR_PORT_PTR(rpv);
|
|
|
|
if (st_lookup(s->ports, (st_data_t)rpv, NULL)) {
|
|
rb_raise(rb_eArgError, "already added");
|
|
}
|
|
|
|
st_insert(s->ports, (st_data_t)rpv, (st_data_t)rp);
|
|
RB_OBJ_WRITTEN(selv, Qundef, rpv);
|
|
|
|
return selv;
|
|
}
|
|
|
|
// Ractor::Selector#remove(r)
|
|
|
|
/* call-seq:
|
|
* remove(ractor) -> ractor
|
|
*
|
|
* Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added.
|
|
* Returns the removed _ractor_.
|
|
*/
|
|
static VALUE
|
|
ractor_selector_remove(VALUE selv, VALUE rpv)
|
|
{
|
|
if (!ractor_port_p(rpv)) {
|
|
rb_raise(rb_eArgError, "Not a Ractor::Port object");
|
|
}
|
|
|
|
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
|
|
if (!st_lookup(s->ports, (st_data_t)rpv, NULL)) {
|
|
rb_raise(rb_eArgError, "not added yet");
|
|
}
|
|
|
|
st_delete(s->ports, (st_data_t *)&rpv, NULL);
|
|
|
|
return selv;
|
|
}
|
|
|
|
// Ractor::Selector#clear
|
|
|
|
/*
|
|
* call-seq:
|
|
* clear -> self
|
|
*
|
|
* Removes all ractors from +self+. Raises +self+.
|
|
*/
|
|
static VALUE
|
|
ractor_selector_clear(VALUE selv)
|
|
{
|
|
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
st_clear(s->ports);
|
|
return selv;
|
|
}
|
|
|
|
/*
|
|
* call-seq:
|
|
* empty? -> true or false
|
|
*
|
|
* Returns +true+ if no ractor is added.
|
|
*/
|
|
static VALUE
|
|
ractor_selector_empty_p(VALUE selv)
|
|
{
|
|
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
|
|
return s->ports->num_entries == 0 ? Qtrue : Qfalse;
|
|
}
|
|
|
|
// Ractor::Selector#wait
|
|
|
|
struct ractor_selector_wait_data {
|
|
rb_ractor_t *cr;
|
|
rb_execution_context_t *ec;
|
|
bool found;
|
|
VALUE v;
|
|
VALUE rpv;
|
|
};
|
|
|
|
static int
|
|
ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t data)
|
|
{
|
|
struct ractor_selector_wait_data *p = (struct ractor_selector_wait_data *)data;
|
|
const struct ractor_port *rp = (const struct ractor_port *)val;
|
|
|
|
VALUE v = ractor_try_receive(p->ec, p->cr, rp);
|
|
|
|
if (v != Qundef) {
|
|
p->found = true;
|
|
p->v = v;
|
|
p->rpv = (VALUE)key;
|
|
return ST_STOP;
|
|
}
|
|
else {
|
|
return ST_CONTINUE;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_selector__wait(rb_execution_context_t *ec, VALUE selector)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selector);
|
|
|
|
struct ractor_selector_wait_data data = {
|
|
.ec = ec,
|
|
.cr = cr,
|
|
.found = false,
|
|
};
|
|
|
|
while (1) {
|
|
st_foreach(s->ports, ractor_selector_wait_i, (st_data_t)&data);
|
|
|
|
if (data.found) {
|
|
return rb_ary_new_from_args(2, data.rpv, data.v);
|
|
}
|
|
|
|
ractor_wait_receive(ec, cr);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* call-seq:
|
|
* wait(receive: false, yield_value: undef, move: false) -> [ractor, value]
|
|
*
|
|
* Waits until any ractor in _selector_ can be active.
|
|
*/
|
|
static VALUE
|
|
ractor_selector_wait(VALUE selector)
|
|
{
|
|
return ractor_selector__wait(GET_EC(), selector);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_selector_new(int argc, VALUE *ractors, VALUE klass)
|
|
{
|
|
VALUE selector = ractor_selector_create(klass);
|
|
|
|
for (int i=0; i<argc; i++) {
|
|
ractor_selector_add(selector, ractors[i]);
|
|
}
|
|
|
|
return selector;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ports)
|
|
{
|
|
VALUE selector = ractor_selector_new(RARRAY_LENINT(ports), (VALUE *)RARRAY_CONST_PTR(ports), rb_cRactorSelector);
|
|
VALUE result = ractor_selector__wait(ec, selector);
|
|
|
|
RB_GC_GUARD(selector);
|
|
RB_GC_GUARD(ports);
|
|
return result;
|
|
}
|
|
|
|
#ifndef USE_RACTOR_SELECTOR
|
|
#define USE_RACTOR_SELECTOR 0
|
|
#endif
|
|
|
|
RUBY_SYMBOL_EXPORT_BEGIN
|
|
void rb_init_ractor_selector(void);
|
|
RUBY_SYMBOL_EXPORT_END
|
|
|
|
/*
|
|
* Document-class: Ractor::Selector
|
|
* :nodoc: currently
|
|
*
|
|
* Selects multiple Ractors to be activated.
|
|
*/
|
|
void
|
|
rb_init_ractor_selector(void)
|
|
{
|
|
rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
|
|
rb_undef_alloc_func(rb_cRactorSelector);
|
|
|
|
rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1);
|
|
rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1);
|
|
rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1);
|
|
rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0);
|
|
rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0);
|
|
rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, 0);
|
|
}
|
|
|
|
static void
|
|
Init_RactorPort(void)
|
|
{
|
|
rb_cRactorPort = rb_define_class_under(rb_cRactor, "Port", rb_cObject);
|
|
rb_define_alloc_func(rb_cRactorPort, ractor_port_alloc);
|
|
rb_define_method(rb_cRactorPort, "initialize", ractor_port_initialize, 0);
|
|
rb_define_method(rb_cRactorPort, "initialize_copy", ractor_port_initialize_copy, 1);
|
|
|
|
#if USE_RACTOR_SELECTOR
|
|
rb_init_ractor_selector();
|
|
#endif
|
|
}
|