ruby/concurrent_set.c
2025-12-09 22:48:06 -08:00

514 lines
16 KiB
C

#include "internal.h"
#include "internal/gc.h"
#include "internal/concurrent_set.h"
#include "ruby/atomic.h"
#include "vm_sync.h"
#define CONCURRENT_SET_CONTINUATION_BIT ((VALUE)1 << (sizeof(VALUE) * CHAR_BIT - 1))
#define CONCURRENT_SET_HASH_MASK (~CONCURRENT_SET_CONTINUATION_BIT)
enum concurrent_set_special_values {
CONCURRENT_SET_EMPTY,
CONCURRENT_SET_DELETED,
CONCURRENT_SET_MOVED,
CONCURRENT_SET_SPECIAL_VALUE_COUNT
};
struct concurrent_set_entry {
VALUE hash;
VALUE key;
};
struct concurrent_set {
rb_atomic_t size;
unsigned int capacity;
unsigned int deleted_entries;
const struct rb_concurrent_set_funcs *funcs;
struct concurrent_set_entry *entries;
};
static void
concurrent_set_mark_continuation(struct concurrent_set_entry *entry, VALUE curr_hash_and_flags)
{
if (curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT) return;
RUBY_ASSERT((curr_hash_and_flags & CONCURRENT_SET_HASH_MASK) != 0);
VALUE new_hash = curr_hash_and_flags | CONCURRENT_SET_CONTINUATION_BIT;
VALUE prev_hash = rbimpl_atomic_value_cas(&entry->hash, curr_hash_and_flags, new_hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
// At the moment we only expect to be racing concurrently against another
// thread also setting the continuation bit.
// In the future if deletion is concurrent this will need adjusting
RUBY_ASSERT(prev_hash == curr_hash_and_flags || prev_hash == new_hash);
(void)prev_hash;
}
static VALUE
concurrent_set_hash(const struct concurrent_set *set, VALUE key)
{
VALUE hash = set->funcs->hash(key);
hash &= CONCURRENT_SET_HASH_MASK;
if (hash == 0) {
hash ^= CONCURRENT_SET_HASH_MASK;
}
RUBY_ASSERT(hash != 0);
RUBY_ASSERT(!(hash & CONCURRENT_SET_CONTINUATION_BIT));
return hash;
}
static void
concurrent_set_free(void *ptr)
{
struct concurrent_set *set = ptr;
xfree(set->entries);
}
static size_t
concurrent_set_size(const void *ptr)
{
const struct concurrent_set *set = ptr;
return sizeof(struct concurrent_set) +
(set->capacity * sizeof(struct concurrent_set_entry));
}
/* Hack: Though it would be trivial, we're intentionally avoiding WB-protecting
* this object. This prevents the object from aging and ensures it can always be
* collected in a minor GC.
* Longer term this deserves a better way to reclaim memory promptly.
*/
static void
concurrent_set_mark(void *ptr)
{
(void)ptr;
}
static const rb_data_type_t concurrent_set_type = {
.wrap_struct_name = "VM/concurrent_set",
.function = {
.dmark = concurrent_set_mark,
.dfree = concurrent_set_free,
.dsize = concurrent_set_size,
},
/* Hack: NOT WB_PROTECTED on purpose (see above) */
.flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_EMBEDDABLE
};
VALUE
rb_concurrent_set_new(const struct rb_concurrent_set_funcs *funcs, int capacity)
{
struct concurrent_set *set;
VALUE obj = TypedData_Make_Struct(0, struct concurrent_set, &concurrent_set_type, set);
set->funcs = funcs;
set->entries = ZALLOC_N(struct concurrent_set_entry, capacity);
set->capacity = capacity;
return obj;
}
rb_atomic_t
rb_concurrent_set_size(VALUE set_obj)
{
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
return RUBY_ATOMIC_LOAD(set->size);
}
struct concurrent_set_probe {
int idx;
int d;
int mask;
};
static int
concurrent_set_probe_start(struct concurrent_set_probe *probe, struct concurrent_set *set, VALUE hash)
{
RUBY_ASSERT((set->capacity & (set->capacity - 1)) == 0);
probe->d = 0;
probe->mask = set->capacity - 1;
probe->idx = hash & probe->mask;
return probe->idx;
}
static int
concurrent_set_probe_next(struct concurrent_set_probe *probe)
{
probe->d++;
probe->idx = (probe->idx + probe->d) & probe->mask;
return probe->idx;
}
static void
concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
{
// Check if another thread has already resized.
if (rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE) != old_set_obj) {
return;
}
struct concurrent_set *old_set = RTYPEDDATA_GET_DATA(old_set_obj);
// This may overcount by up to the number of threads concurrently attempting to insert
// GC may also happen between now and the set being rebuilt
int expected_size = rbimpl_atomic_load(&old_set->size, RBIMPL_ATOMIC_RELAXED) - old_set->deleted_entries;
// NOTE: new capacity must make sense with load factor, don't change one without checking the other.
struct concurrent_set_entry *old_entries = old_set->entries;
int old_capacity = old_set->capacity;
int new_capacity = old_capacity * 2;
if (new_capacity > expected_size * 8) {
new_capacity = old_capacity / 2;
}
else if (new_capacity > expected_size * 4) {
new_capacity = old_capacity;
}
// May cause GC and therefore deletes, so must happen first.
VALUE new_set_obj = rb_concurrent_set_new(old_set->funcs, new_capacity);
struct concurrent_set *new_set = RTYPEDDATA_GET_DATA(new_set_obj);
for (int i = 0; i < old_capacity; i++) {
struct concurrent_set_entry *old_entry = &old_entries[i];
VALUE key = rbimpl_atomic_value_exchange(&old_entry->key, CONCURRENT_SET_MOVED, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(key != CONCURRENT_SET_MOVED);
if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue;
if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue;
VALUE hash = rbimpl_atomic_value_load(&old_entry->hash, RBIMPL_ATOMIC_RELAXED) & CONCURRENT_SET_HASH_MASK;
RUBY_ASSERT(hash != 0);
RUBY_ASSERT(hash == concurrent_set_hash(old_set, key));
// Insert key into new_set.
struct concurrent_set_probe probe;
int idx = concurrent_set_probe_start(&probe, new_set, hash);
while (true) {
struct concurrent_set_entry *entry = &new_set->entries[idx];
if (entry->hash == CONCURRENT_SET_EMPTY) {
RUBY_ASSERT(entry->key == CONCURRENT_SET_EMPTY);
new_set->size++;
RUBY_ASSERT(new_set->size <= new_set->capacity / 2);
entry->key = key;
entry->hash = hash;
break;
}
RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
entry->hash |= CONCURRENT_SET_CONTINUATION_BIT;
idx = concurrent_set_probe_next(&probe);
}
}
rbimpl_atomic_value_store(set_obj_ptr, new_set_obj, RBIMPL_ATOMIC_RELEASE);
RB_GC_GUARD(old_set_obj);
}
static void
concurrent_set_try_resize(VALUE old_set_obj, VALUE *set_obj_ptr)
{
RB_VM_LOCKING() {
concurrent_set_try_resize_without_locking(old_set_obj, set_obj_ptr);
}
}
VALUE
rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
{
RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
VALUE set_obj;
VALUE hash = 0;
struct concurrent_set *set;
struct concurrent_set_probe probe;
int idx;
retry:
set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
set = RTYPEDDATA_GET_DATA(set_obj);
if (hash == 0) {
// We don't need to recompute the hash on every retry because it should
// never change.
hash = concurrent_set_hash(set, key);
}
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
return 0;
}
if (curr_hash != hash) {
if (!continuation) {
return 0;
}
idx = concurrent_set_probe_next(&probe);
continue;
}
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
switch (curr_key) {
case CONCURRENT_SET_EMPTY:
// In-progress insert: hash written but key not yet
break;
case CONCURRENT_SET_DELETED:
break;
case CONCURRENT_SET_MOVED:
// Wait
RB_VM_LOCKING();
goto retry;
default: {
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
// Skip it and let the GC pass clean it up
break;
}
if (set->funcs->cmp(key, curr_key)) {
// We've found a match.
RB_GC_GUARD(set_obj);
return curr_key;
}
if (!continuation) {
return 0;
}
break;
}
}
idx = concurrent_set_probe_next(&probe);
}
}
VALUE
rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
{
RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
// First attempt to find
{
VALUE result = rb_concurrent_set_find(set_obj_ptr, key);
if (result) return result;
}
// First time we need to call create, and store the hash
VALUE set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
key = set->funcs->create(key, data);
VALUE hash = concurrent_set_hash(set, key);
struct concurrent_set_probe probe;
int idx;
goto start_search;
retry:
// On retries we only need to load the hash object
set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
set = RTYPEDDATA_GET_DATA(set_obj);
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
start_search:
idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
// Reserve this slot for our hash value
curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) {
// Lost race, retry same slot to check winner's hash
continue;
}
// CAS succeeded, so these are the values stored
curr_hash_and_flags = hash;
curr_hash = hash;
// Fall through to try to claim key
}
if (curr_hash != hash) {
goto probe_next;
}
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
switch (curr_key) {
case CONCURRENT_SET_EMPTY: {
rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
// Load_factor reached at 75% full. ex: prev_size: 32, capacity: 64, load_factor: 50%.
bool load_factor_reached = (uint64_t)(prev_size * 4) >= (uint64_t)(set->capacity * 3);
if (UNLIKELY(load_factor_reached)) {
concurrent_set_try_resize(set_obj, set_obj_ptr);
goto retry;
}
VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
if (prev_key == CONCURRENT_SET_EMPTY) {
RUBY_ASSERT(rb_concurrent_set_find(set_obj_ptr, key) == key);
RB_GC_GUARD(set_obj);
return key;
}
else {
// Entry was not inserted.
rbimpl_atomic_sub(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
// Another thread won the race, try again at the same location.
continue;
}
}
case CONCURRENT_SET_DELETED:
break;
case CONCURRENT_SET_MOVED:
// Wait
RB_VM_LOCKING();
goto retry;
default:
// We're never GC during our search
// If the continuation bit wasn't set at the start of our search,
// any concurrent find with the same hash value would also look at
// this location and try to swap curr_key
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
if (continuation) {
goto probe_next;
}
rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_EMPTY, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
continue;
}
if (set->funcs->cmp(key, curr_key)) {
// We've found a live match.
RB_GC_GUARD(set_obj);
// We created key using set->funcs->create, but we didn't end
// up inserting it into the set. Free it here to prevent memory
// leaks.
if (set->funcs->free) set->funcs->free(key);
return curr_key;
}
break;
}
probe_next:
RUBY_ASSERT(curr_hash_and_flags != CONCURRENT_SET_EMPTY);
concurrent_set_mark_continuation(entry, curr_hash_and_flags);
idx = concurrent_set_probe_next(&probe);
}
}
static void
concurrent_set_delete_entry_locked(struct concurrent_set *set, struct concurrent_set_entry *entry)
{
ASSERT_vm_locking_with_barrier();
if (entry->hash & CONCURRENT_SET_CONTINUATION_BIT) {
entry->hash = CONCURRENT_SET_CONTINUATION_BIT;
entry->key = CONCURRENT_SET_DELETED;
set->deleted_entries++;
}
else {
entry->hash = CONCURRENT_SET_EMPTY;
entry->key = CONCURRENT_SET_EMPTY;
set->size--;
}
}
VALUE
rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
{
ASSERT_vm_locking_with_barrier();
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
VALUE hash = concurrent_set_hash(set, key);
struct concurrent_set_probe probe;
int idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
VALUE curr_key = entry->key;
switch (curr_key) {
case CONCURRENT_SET_EMPTY:
// We didn't find our entry to delete.
return 0;
case CONCURRENT_SET_DELETED:
break;
case CONCURRENT_SET_MOVED:
rb_bug("rb_concurrent_set_delete_by_identity: moved entry");
break;
default:
if (key == curr_key) {
RUBY_ASSERT((entry->hash & CONCURRENT_SET_HASH_MASK) == hash);
concurrent_set_delete_entry_locked(set, entry);
return curr_key;
}
break;
}
idx = concurrent_set_probe_next(&probe);
}
}
void
rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key, void *data), void *data)
{
ASSERT_vm_locking_with_barrier();
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
for (unsigned int i = 0; i < set->capacity; i++) {
struct concurrent_set_entry *entry = &set->entries[i];
VALUE key = entry->key;
switch (key) {
case CONCURRENT_SET_EMPTY:
case CONCURRENT_SET_DELETED:
continue;
case CONCURRENT_SET_MOVED:
rb_bug("rb_concurrent_set_foreach_with_replace: moved entry");
break;
default: {
int ret = callback(&entry->key, data);
switch (ret) {
case ST_STOP:
return;
case ST_DELETE:
concurrent_set_delete_entry_locked(set, entry);
break;
}
break;
}
}
}
}