Attempt to reuse garbage slots in concurrent hash

This removes all allocations from the find_or_insert loop, which
requires us to start the search over after calling the provided create
function.

In exchange that allows us to assume that all concurrent threads insert
will get the same view of the GC state, and so should all be attempting
to clear and reuse a slot containing a garbage object.
This commit is contained in:
John Hawthorn 2025-12-09 02:17:38 -08:00
parent 81fbdff8fd
commit 462df17f86
Notes: git 2025-12-10 06:48:36 +00:00

View File

@ -222,11 +222,14 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
VALUE set_obj;
VALUE hash = 0;
struct concurrent_set *set;
struct concurrent_set_probe probe;
int idx;
retry:
set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
set = RTYPEDDATA_GET_DATA(set_obj);
if (hash == 0) {
// We don't need to recompute the hash on every retry because it should
@ -235,8 +238,7 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
}
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
struct concurrent_set_probe probe;
int idx = concurrent_set_probe_start(&probe, set, hash);
idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
@ -299,37 +301,43 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
{
RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
bool inserting = false;
VALUE set_obj;
VALUE hash = 0;
retry:
set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
if (hash == 0) {
// We don't need to recompute the hash on every retry because it should
// never change.
hash = concurrent_set_hash(set, key);
// First attempt to find
{
VALUE result = rb_concurrent_set_find(set_obj_ptr, key);
if (result) return result;
}
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
// First time we need to call create, and store the hash
VALUE set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
key = set->funcs->create(key, data);
VALUE hash = concurrent_set_hash(set, key);
struct concurrent_set_probe probe;
int idx = concurrent_set_probe_start(&probe, set, hash);
int idx;
goto start_search;
retry:
// On retries we only need to load the hash object
set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
set = RTYPEDDATA_GET_DATA(set_obj);
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
start_search:
idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
if (!inserting) {
key = set->funcs->create(key, data);
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
inserting = true;
}
// Reserve this slot for our hash value
curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) {
@ -340,6 +348,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
// CAS succeeded, so these are the values stored
curr_hash_and_flags = hash;
curr_hash = hash;
// Fall through to try to claim key
}
@ -350,13 +359,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
switch (curr_key) {
case CONCURRENT_SET_EMPTY:
if (!inserting) {
key = set->funcs->create(key, data);
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
inserting = true;
}
case CONCURRENT_SET_EMPTY: {
rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
// Load_factor reached at 75% full. ex: prev_size: 32, capacity: 64, load_factor: 50%.
@ -369,6 +372,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
if (prev_key == CONCURRENT_SET_EMPTY) {
RUBY_ASSERT(rb_concurrent_set_find(set_obj_ptr, key) == key);
RB_GC_GUARD(set_obj);
return key;
}
@ -379,6 +383,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
// Another thread won the race, try again at the same location.
continue;
}
}
case CONCURRENT_SET_DELETED:
break;
case CONCURRENT_SET_MOVED:
@ -386,22 +391,26 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
RB_VM_LOCKING();
goto retry;
default:
// We're never GC during our search
// If the continuation bit wasn't set at the start of our search,
// any concurrent find with the same hash value would also look at
// this location and try to swap curr_key
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
// Skip it and let the GC pass clean it up
break;
if (continuation) {
goto probe_next;
}
rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_EMPTY, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
continue;
}
if (set->funcs->cmp(key, curr_key)) {
// We've found a match.
// We've found a live match.
RB_GC_GUARD(set_obj);
if (inserting) {
// We created key using set->funcs->create, but we didn't end
// up inserting it into the set. Free it here to prevent memory
// leaks.
if (set->funcs->free) set->funcs->free(key);
}
// We created key using set->funcs->create, but we didn't end
// up inserting it into the set. Free it here to prevent memory
// leaks.
if (set->funcs->free) set->funcs->free(key);
return curr_key;
}