mirror of
https://github.com/ruby/ruby.git
synced 2026-01-27 04:24:23 +00:00
Use a serial to keep track of Mutex-owning Fiber
Previously this held a pointer to the Fiber itself, which requires marking it (which was only implemented recently, prior to that it was buggy). Using a monotonically increasing integer instead allows us to avoid having a free function and keeps everything simpler. My main motivations in making this change are that the root fiber lazily allocates self, which makes the writebarrier implementation challenging to do correctly, and wanting to avoid sending Mutexes to the remembered set when locked by a short-lived Fiber.
This commit is contained in:
parent
d1b11592af
commit
ff1d23eccb
Notes:
git
2025-11-20 22:09:18 +00:00
18
cont.c
18
cont.c
@ -268,6 +268,8 @@ struct rb_fiber_struct {
|
||||
|
||||
unsigned int killed : 1;
|
||||
|
||||
rb_serial_t serial;
|
||||
|
||||
struct coroutine_context context;
|
||||
struct fiber_pool_stack stack;
|
||||
};
|
||||
@ -1010,6 +1012,13 @@ rb_fiber_threadptr(const rb_fiber_t *fiber)
|
||||
return fiber->cont.saved_ec.thread_ptr;
|
||||
}
|
||||
|
||||
rb_serial_t
|
||||
rb_fiber_serial(const rb_fiber_t *fiber)
|
||||
{
|
||||
VM_ASSERT(fiber->serial >= 1);
|
||||
return fiber->serial;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
cont_thread_value(const rb_context_t *cont)
|
||||
{
|
||||
@ -1995,6 +2004,13 @@ fiber_alloc(VALUE klass)
|
||||
return TypedData_Wrap_Struct(klass, &fiber_data_type, 0);
|
||||
}
|
||||
|
||||
static rb_serial_t
|
||||
next_fiber_serial(void)
|
||||
{
|
||||
static rbimpl_atomic_uint64_t fiber_serial = 1;
|
||||
return (rb_serial_t)ATOMIC_U64_FETCH_ADD(fiber_serial, 1);
|
||||
}
|
||||
|
||||
static rb_fiber_t*
|
||||
fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
|
||||
{
|
||||
@ -2011,6 +2027,7 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
|
||||
fiber->cont.type = FIBER_CONTEXT;
|
||||
fiber->blocking = blocking;
|
||||
fiber->killed = 0;
|
||||
fiber->serial = next_fiber_serial();
|
||||
cont_init(&fiber->cont, th);
|
||||
|
||||
fiber->cont.saved_ec.fiber_ptr = fiber;
|
||||
@ -2563,6 +2580,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th)
|
||||
fiber->cont.saved_ec.thread_ptr = th;
|
||||
fiber->blocking = 1;
|
||||
fiber->killed = 0;
|
||||
fiber->serial = next_fiber_serial();
|
||||
fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */
|
||||
th->ec = &fiber->cont.saved_ec;
|
||||
cont_init_jit_cont(&fiber->cont);
|
||||
|
||||
@ -31,5 +31,6 @@ VALUE rb_fiber_inherit_storage(struct rb_execution_context_struct *ec, struct rb
|
||||
VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber);
|
||||
unsigned int rb_fiberptr_blocking(struct rb_fiber_struct *fiber);
|
||||
struct rb_execution_context_struct * rb_fiberptr_get_ec(struct rb_fiber_struct *fiber);
|
||||
rb_serial_t rb_fiber_serial(const struct rb_fiber_struct *fiber);
|
||||
|
||||
#endif /* INTERNAL_CONT_H */
|
||||
|
||||
@ -63,4 +63,27 @@ rbimpl_atomic_u64_set_relaxed(volatile rbimpl_atomic_uint64_t *address, uint64_t
|
||||
}
|
||||
#define ATOMIC_U64_SET_RELAXED(var, val) rbimpl_atomic_u64_set_relaxed(&(var), val)
|
||||
|
||||
static inline uint64_t
|
||||
rbimpl_atomic_u64_fetch_add(volatile rbimpl_atomic_uint64_t *ptr, uint64_t val)
|
||||
{
|
||||
#if defined(HAVE_GCC_ATOMIC_BUILTINS_64)
|
||||
return __atomic_fetch_add(ptr, val, __ATOMIC_SEQ_CST);
|
||||
#elif defined(_WIN32)
|
||||
return InterlockedExchangeAdd64((volatile LONG64 *)ptr, val);
|
||||
#elif defined(__sun) && defined(HAVE_ATOMIC_H) && (defined(_LP64) || defined(_I32LPx))
|
||||
return atomic_add_64_nv(ptr, val) - val;
|
||||
#elif defined(HAVE_STDATOMIC_H)
|
||||
return atomic_fetch_add_explicit((_Atomic uint64_t *)ptr, val, memory_order_seq_cst);
|
||||
#else
|
||||
// Fallback using mutex for platforms without 64-bit atomics
|
||||
static rb_native_mutex_t lock = RB_NATIVE_MUTEX_INITIALIZER;
|
||||
rb_native_mutex_lock(&lock);
|
||||
uint64_t old = *ptr;
|
||||
*ptr = old + val;
|
||||
rb_native_mutex_unlock(&lock);
|
||||
return old;
|
||||
#endif
|
||||
}
|
||||
#define ATOMIC_U64_FETCH_ADD(var, val) rbimpl_atomic_u64_fetch_add(&(var), val)
|
||||
|
||||
#endif
|
||||
|
||||
14
thread.c
14
thread.c
@ -442,8 +442,8 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
|
||||
th->keeping_mutexes = mutex->next_mutex;
|
||||
|
||||
// rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th);
|
||||
VM_ASSERT(mutex->fiber);
|
||||
const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
|
||||
VM_ASSERT(mutex->fiber_serial);
|
||||
const char *error_message = rb_mutex_unlock_th(mutex, th, NULL);
|
||||
if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
|
||||
}
|
||||
}
|
||||
@ -5263,7 +5263,7 @@ rb_thread_shield_owned(VALUE self)
|
||||
|
||||
rb_mutex_t *m = mutex_ptr(mutex);
|
||||
|
||||
return m->fiber == GET_EC()->fiber_ptr;
|
||||
return m->fiber_serial == rb_fiber_serial(GET_EC()->fiber_ptr);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -5282,7 +5282,7 @@ rb_thread_shield_wait(VALUE self)
|
||||
|
||||
if (!mutex) return Qfalse;
|
||||
m = mutex_ptr(mutex);
|
||||
if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
|
||||
if (m->fiber_serial == rb_fiber_serial(GET_EC()->fiber_ptr)) return Qnil;
|
||||
rb_thread_shield_waiting_inc(self);
|
||||
rb_mutex_lock(mutex);
|
||||
rb_thread_shield_waiting_dec(self);
|
||||
@ -5799,8 +5799,8 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
|
||||
|
||||
if (th->locking_mutex) {
|
||||
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
|
||||
rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
|
||||
(void *)mutex->fiber, rb_mutex_num_waiting(mutex));
|
||||
rb_str_catf(msg, " mutex:%llu cond:%"PRIuSIZE,
|
||||
(unsigned long long)mutex->fiber_serial, rb_mutex_num_waiting(mutex));
|
||||
}
|
||||
|
||||
{
|
||||
@ -5840,7 +5840,7 @@ rb_check_deadlock(rb_ractor_t *r)
|
||||
}
|
||||
else if (th->locking_mutex) {
|
||||
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
|
||||
if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !ccan_list_empty(&mutex->waitq))) {
|
||||
if (mutex->fiber_serial == rb_fiber_serial(th->ec->fiber_ptr) || (!mutex->fiber_serial && !ccan_list_empty(&mutex->waitq))) {
|
||||
found = 1;
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,7 +7,7 @@ static VALUE rb_eClosedQueueError;
|
||||
|
||||
/* Mutex */
|
||||
typedef struct rb_mutex_struct {
|
||||
rb_fiber_t *fiber;
|
||||
rb_serial_t fiber_serial;
|
||||
VALUE thread; // even if the fiber is collected, we might need access to the thread in mutex_free
|
||||
struct rb_mutex_struct *next_mutex;
|
||||
struct ccan_list_head waitq; /* protected by GVL */
|
||||
@ -125,28 +125,7 @@ rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
|
||||
static bool
|
||||
locked_p(rb_mutex_t *mutex)
|
||||
{
|
||||
return mutex->fiber != 0;
|
||||
}
|
||||
|
||||
static void
|
||||
mutex_mark(void *ptr)
|
||||
{
|
||||
rb_mutex_t *mutex = ptr;
|
||||
VALUE fiber;
|
||||
if (locked_p(mutex)) {
|
||||
fiber = rb_fiberptr_self(mutex->fiber); // rb_fiber_t* doesn't move along with fiber object
|
||||
if (fiber) rb_gc_mark_movable(fiber);
|
||||
rb_gc_mark_movable(mutex->thread);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
mutex_compact(void *ptr)
|
||||
{
|
||||
rb_mutex_t *mutex = ptr;
|
||||
if (locked_p(mutex)) {
|
||||
mutex->thread = rb_gc_location(mutex->thread);
|
||||
}
|
||||
return mutex->fiber_serial != 0;
|
||||
}
|
||||
|
||||
static void
|
||||
@ -154,7 +133,7 @@ mutex_free(void *ptr)
|
||||
{
|
||||
rb_mutex_t *mutex = ptr;
|
||||
if (locked_p(mutex)) {
|
||||
const char *err = rb_mutex_unlock_th(mutex, rb_thread_ptr(mutex->thread), mutex->fiber);
|
||||
const char *err = rb_mutex_unlock_th(mutex, rb_thread_ptr(mutex->thread), NULL);
|
||||
if (err) rb_bug("%s", err);
|
||||
}
|
||||
ruby_xfree(ptr);
|
||||
@ -168,8 +147,8 @@ mutex_memsize(const void *ptr)
|
||||
|
||||
static const rb_data_type_t mutex_data_type = {
|
||||
"mutex",
|
||||
{mutex_mark, mutex_free, mutex_memsize, mutex_compact,},
|
||||
0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY
|
||||
{NULL, mutex_free, mutex_memsize,},
|
||||
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
|
||||
};
|
||||
|
||||
static rb_mutex_t *
|
||||
@ -265,11 +244,7 @@ mutex_set_owner(VALUE self, rb_thread_t *th, rb_fiber_t *fiber)
|
||||
rb_mutex_t *mutex = mutex_ptr(self);
|
||||
|
||||
mutex->thread = th->self;
|
||||
mutex->fiber = fiber;
|
||||
RB_OBJ_WRITTEN(self, Qundef, th->self);
|
||||
if (fiber) {
|
||||
RB_OBJ_WRITTEN(self, Qundef, rb_fiberptr_self(fiber));
|
||||
}
|
||||
mutex->fiber_serial = rb_fiber_serial(fiber);
|
||||
}
|
||||
|
||||
static void
|
||||
@ -293,7 +268,7 @@ rb_mutex_trylock(VALUE self)
|
||||
{
|
||||
rb_mutex_t *mutex = mutex_ptr(self);
|
||||
|
||||
if (mutex->fiber == 0) {
|
||||
if (mutex->fiber_serial == 0) {
|
||||
RUBY_DEBUG_LOG("%p ok", mutex);
|
||||
|
||||
rb_fiber_t *fiber = GET_EC()->fiber_ptr;
|
||||
@ -311,7 +286,7 @@ rb_mutex_trylock(VALUE self)
|
||||
static VALUE
|
||||
mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
|
||||
{
|
||||
return RBOOL(mutex->fiber == fiber);
|
||||
return RBOOL(mutex->fiber_serial == rb_fiber_serial(fiber));
|
||||
}
|
||||
|
||||
static VALUE
|
||||
@ -347,12 +322,12 @@ do_mutex_lock(VALUE self, int interruptible_p)
|
||||
}
|
||||
|
||||
if (rb_mutex_trylock(self) == Qfalse) {
|
||||
if (mutex->fiber == fiber) {
|
||||
if (mutex->fiber_serial == rb_fiber_serial(fiber)) {
|
||||
rb_raise(rb_eThreadError, "deadlock; recursive locking");
|
||||
}
|
||||
|
||||
while (mutex->fiber != fiber) {
|
||||
VM_ASSERT(mutex->fiber != NULL);
|
||||
while (mutex->fiber_serial != rb_fiber_serial(fiber)) {
|
||||
VM_ASSERT(mutex->fiber_serial != 0);
|
||||
|
||||
VALUE scheduler = rb_fiber_scheduler_current();
|
||||
if (scheduler != Qnil) {
|
||||
@ -366,12 +341,12 @@ do_mutex_lock(VALUE self, int interruptible_p)
|
||||
|
||||
rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
|
||||
|
||||
if (!mutex->fiber) {
|
||||
if (!mutex->fiber_serial) {
|
||||
mutex_set_owner(self, th, fiber);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (!th->vm->thread_ignore_deadlock && rb_fiber_threadptr(mutex->fiber) == th) {
|
||||
if (!th->vm->thread_ignore_deadlock && rb_thread_ptr(mutex->thread) == th) {
|
||||
rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
|
||||
}
|
||||
|
||||
@ -407,7 +382,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
|
||||
ccan_list_del(&sync_waiter.node);
|
||||
|
||||
// unlocked by another thread while sleeping
|
||||
if (!mutex->fiber) {
|
||||
if (!mutex->fiber_serial) {
|
||||
mutex_set_owner(self, th, fiber);
|
||||
}
|
||||
|
||||
@ -421,12 +396,12 @@ do_mutex_lock(VALUE self, int interruptible_p)
|
||||
if (interruptible_p) {
|
||||
/* release mutex before checking for interrupts...as interrupt checking
|
||||
* code might call rb_raise() */
|
||||
if (mutex->fiber == fiber) {
|
||||
if (mutex->fiber_serial == rb_fiber_serial(fiber)) {
|
||||
mutex->thread = Qfalse;
|
||||
mutex->fiber = NULL;
|
||||
mutex->fiber_serial = 0;
|
||||
}
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
|
||||
if (!mutex->fiber) {
|
||||
if (!mutex->fiber_serial) {
|
||||
mutex_set_owner(self, th, fiber);
|
||||
}
|
||||
}
|
||||
@ -446,7 +421,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
|
||||
}
|
||||
|
||||
if (saved_ints) th->ec->interrupt_flag = saved_ints;
|
||||
if (mutex->fiber == fiber) mutex_locked(th, fiber, self);
|
||||
if (mutex->fiber_serial == rb_fiber_serial(fiber)) mutex_locked(th, fiber, self);
|
||||
}
|
||||
|
||||
RUBY_DEBUG_LOG("%p locked", mutex);
|
||||
@ -496,16 +471,16 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
|
||||
{
|
||||
RUBY_DEBUG_LOG("%p", mutex);
|
||||
|
||||
if (mutex->fiber == 0) {
|
||||
if (mutex->fiber_serial == 0) {
|
||||
return "Attempt to unlock a mutex which is not locked";
|
||||
}
|
||||
else if (mutex->fiber != fiber) {
|
||||
else if (fiber && mutex->fiber_serial != rb_fiber_serial(fiber)) {
|
||||
return "Attempt to unlock a mutex which is locked by another thread/fiber";
|
||||
}
|
||||
|
||||
struct sync_waiter *cur = 0, *next;
|
||||
|
||||
mutex->fiber = 0;
|
||||
mutex->fiber_serial = 0;
|
||||
thread_mutex_remove(th, mutex);
|
||||
|
||||
ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
|
||||
@ -583,7 +558,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
|
||||
while (mutexes) {
|
||||
mutex = mutexes;
|
||||
mutexes = mutex->next_mutex;
|
||||
mutex->fiber = 0;
|
||||
mutex->fiber_serial = 0;
|
||||
mutex->next_mutex = 0;
|
||||
ccan_list_head_init(&mutex->waitq);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user