From 81fbdff8fdf2ae7afb2fa19319ff7d40379521fe Mon Sep 17 00:00:00 2001 From: John Hawthorn Date: Mon, 24 Nov 2025 16:50:29 -0800 Subject: [PATCH] Use continuation bit in concurrent set This refactors the concurrent set to examine and reserve a slot via CAS with the hash, before then doing the same with the key. This allows us to use an extra bit from the hash as a "continuation bit" which marks whether we have ever probed past this key while inserting. When that bit isn't set on deletion we can clear the field instead of placing a tombstone. --- bootstraptest/test_ractor.rb | 3 + concurrent_set.c | 173 +++++++++++++++++++++++++---------- 2 files changed, 129 insertions(+), 47 deletions(-) diff --git a/bootstraptest/test_ractor.rb b/bootstraptest/test_ractor.rb index 81dc2d6b8d..13c4652d37 100644 --- a/bootstraptest/test_ractor.rb +++ b/bootstraptest/test_ractor.rb @@ -1494,6 +1494,9 @@ assert_equal "ok", %Q{ unless a[i].equal?(b[i]) raise [a[i], b[i]].inspect end + unless a[i] == i.to_s + raise [i, a[i], b[i]].inspect + end end :ok } diff --git a/concurrent_set.c b/concurrent_set.c index 3aa61507aa..eebf7df9cb 100644 --- a/concurrent_set.c +++ b/concurrent_set.c @@ -4,6 +4,9 @@ #include "ruby/atomic.h" #include "vm_sync.h" +#define CONCURRENT_SET_CONTINUATION_BIT ((VALUE)1 << (sizeof(VALUE) * CHAR_BIT - 1)) +#define CONCURRENT_SET_HASH_MASK (~CONCURRENT_SET_CONTINUATION_BIT) + enum concurrent_set_special_values { CONCURRENT_SET_EMPTY, CONCURRENT_SET_DELETED, @@ -24,6 +27,36 @@ struct concurrent_set { struct concurrent_set_entry *entries; }; +static void +concurrent_set_mark_continuation(struct concurrent_set_entry *entry, VALUE curr_hash_and_flags) +{ + if (curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT) return; + + RUBY_ASSERT((curr_hash_and_flags & CONCURRENT_SET_HASH_MASK) != 0); + + VALUE new_hash = curr_hash_and_flags | CONCURRENT_SET_CONTINUATION_BIT; + VALUE prev_hash = rbimpl_atomic_value_cas(&entry->hash, curr_hash_and_flags, new_hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + + // At the moment we only expect to be racing concurrently against another + // thread also setting the continuation bit. + // In the future if deletion is concurrent this will need adjusting + RUBY_ASSERT(prev_hash == curr_hash_and_flags || prev_hash == new_hash); + (void)prev_hash; +} + +static VALUE +concurrent_set_hash(const struct concurrent_set *set, VALUE key) +{ + VALUE hash = set->funcs->hash(key); + hash &= CONCURRENT_SET_HASH_MASK; + if (hash == 0) { + hash ^= CONCURRENT_SET_HASH_MASK; + } + RUBY_ASSERT(hash != 0); + RUBY_ASSERT(!(hash & CONCURRENT_SET_CONTINUATION_BIT)); + return hash; +} + static void concurrent_set_free(void *ptr) { @@ -141,13 +174,9 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr) if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue; if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue; - VALUE hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED); - if (hash == 0) { - // Either in-progress insert or extremely unlikely 0 hash. - // Re-calculate the hash. - hash = old_set->funcs->hash(key); - } - RUBY_ASSERT(hash == old_set->funcs->hash(key)); + VALUE hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED) & CONCURRENT_SET_HASH_MASK; + RUBY_ASSERT(hash != 0); + RUBY_ASSERT(hash == concurrent_set_hash(old_set, key)); // Insert key into new_set. struct concurrent_set_probe probe; @@ -156,20 +185,19 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr) while (true) { struct concurrent_set_entry *entry = &new_set->entries[idx]; - if (entry->key == CONCURRENT_SET_EMPTY) { - new_set->size++; + if (entry->hash == CONCURRENT_SET_EMPTY) { + RUBY_ASSERT(entry->key == CONCURRENT_SET_EMPTY); + new_set->size++; RUBY_ASSERT(new_set->size <= new_set->capacity / 2); - RUBY_ASSERT(entry->hash == 0); entry->key = key; entry->hash = hash; break; } - else { - RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT); - } + RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT); + entry->hash |= CONCURRENT_SET_CONTINUATION_BIT; idx = concurrent_set_probe_next(&probe); } } @@ -203,20 +231,37 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key) if (hash == 0) { // We don't need to recompute the hash on every retry because it should // never change. - hash = set->funcs->hash(key); + hash = concurrent_set_hash(set, key); } - RUBY_ASSERT(hash == set->funcs->hash(key)); + RUBY_ASSERT(hash == concurrent_set_hash(set, key)); struct concurrent_set_probe probe; int idx = concurrent_set_probe_start(&probe, set, hash); while (true) { struct concurrent_set_entry *entry = &set->entries[idx]; + VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE); + VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK; + bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT; + + if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) { + return 0; + } + + if (curr_hash != hash) { + if (!continuation) { + return 0; + } + idx = concurrent_set_probe_next(&probe); + continue; + } + VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE); switch (curr_key) { case CONCURRENT_SET_EMPTY: - return 0; + // In-progress insert: hash written but key not yet + break; case CONCURRENT_SET_DELETED: break; case CONCURRENT_SET_MOVED: @@ -225,13 +270,9 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key) goto retry; default: { - VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED); - if (curr_hash != 0 && curr_hash != hash) break; - if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) { // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object. - // Skip it and mark it as deleted. - rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + // Skip it and let the GC pass clean it up break; } @@ -241,6 +282,10 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key) return curr_key; } + if (!continuation) { + return 0; + } + break; } } @@ -266,23 +311,49 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) if (hash == 0) { // We don't need to recompute the hash on every retry because it should // never change. - hash = set->funcs->hash(key); + hash = concurrent_set_hash(set, key); } - RUBY_ASSERT(hash == set->funcs->hash(key)); + RUBY_ASSERT(hash == concurrent_set_hash(set, key)); struct concurrent_set_probe probe; int idx = concurrent_set_probe_start(&probe, set, hash); while (true) { struct concurrent_set_entry *entry = &set->entries[idx]; + VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE); + VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK; + + if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) { + if (!inserting) { + key = set->funcs->create(key, data); + RUBY_ASSERT(hash == concurrent_set_hash(set, key)); + inserting = true; + } + + // Reserve this slot for our hash value + curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) { + // Lost race, retry same slot to check winner's hash + continue; + } + + // CAS succeeded, so these are the values stored + curr_hash_and_flags = hash; + curr_hash = hash; + // Fall through to try to claim key + } + + if (curr_hash != hash) { + goto probe_next; + } + VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE); switch (curr_key) { - case CONCURRENT_SET_EMPTY: { - // Not in set + case CONCURRENT_SET_EMPTY: if (!inserting) { key = set->funcs->create(key, data); - RUBY_ASSERT(hash == set->funcs->hash(key)); + RUBY_ASSERT(hash == concurrent_set_hash(set, key)); inserting = true; } @@ -293,14 +364,11 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) if (UNLIKELY(load_factor_reached)) { concurrent_set_try_resize(set_obj, set_obj_ptr); - goto retry; } - curr_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); - if (curr_key == CONCURRENT_SET_EMPTY) { - rbimpl_atomic_value_store(&entry->hash, hash, RBIMPL_ATOMIC_RELAXED); - + VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + if (prev_key == CONCURRENT_SET_EMPTY) { RB_GC_GUARD(set_obj); return key; } @@ -311,22 +379,16 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) // Another thread won the race, try again at the same location. continue; } - } case CONCURRENT_SET_DELETED: break; case CONCURRENT_SET_MOVED: // Wait RB_VM_LOCKING(); - goto retry; - default: { - VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED); - if (curr_hash != 0 && curr_hash != hash) break; - + default: if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) { // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object. - // Skip it and mark it as deleted. - rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + // Skip it and let the GC pass clean it up break; } @@ -343,15 +405,33 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) return curr_key; } - break; - } } + probe_next: + RUBY_ASSERT(curr_hash_and_flags != CONCURRENT_SET_EMPTY); + concurrent_set_mark_continuation(entry, curr_hash_and_flags); idx = concurrent_set_probe_next(&probe); } } +static void +concurrent_set_delete_entry_locked(struct concurrent_set *set, struct concurrent_set_entry *entry) +{ + ASSERT_vm_locking_with_barrier(); + + if (entry->hash & CONCURRENT_SET_CONTINUATION_BIT) { + entry->hash = CONCURRENT_SET_CONTINUATION_BIT; + entry->key = CONCURRENT_SET_DELETED; + set->deleted_entries++; + } + else { + entry->hash = CONCURRENT_SET_EMPTY; + entry->key = CONCURRENT_SET_EMPTY; + set->size--; + } +} + VALUE rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key) { @@ -359,7 +439,7 @@ rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key) struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); - VALUE hash = set->funcs->hash(key); + VALUE hash = concurrent_set_hash(set, key); struct concurrent_set_probe probe; int idx = concurrent_set_probe_start(&probe, set, hash); @@ -379,8 +459,8 @@ rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key) break; default: if (key == curr_key) { - entry->key = CONCURRENT_SET_DELETED; - set->deleted_entries++; + RUBY_ASSERT((entry->hash & CONCURRENT_SET_HASH_MASK) == hash); + concurrent_set_delete_entry_locked(set, entry); return curr_key; } break; @@ -399,7 +479,7 @@ rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key for (unsigned int i = 0; i < set->capacity; i++) { struct concurrent_set_entry *entry = &set->entries[i]; - VALUE key = set->entries[i].key; + VALUE key = entry->key; switch (key) { case CONCURRENT_SET_EMPTY: @@ -414,8 +494,7 @@ rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key case ST_STOP: return; case ST_DELETE: - set->entries[i].key = CONCURRENT_SET_DELETED; - set->deleted_entries++; + concurrent_set_delete_entry_locked(set, entry); break; } break;