mirror of
https://github.com/ruby/ruby.git
synced 2026-01-26 20:19:19 +00:00
Use continuation bit in concurrent set
This refactors the concurrent set to examine and reserve a slot via CAS with the hash, before then doing the same with the key. This allows us to use an extra bit from the hash as a "continuation bit" which marks whether we have ever probed past this key while inserting. When that bit isn't set on deletion we can clear the field instead of placing a tombstone.
This commit is contained in:
parent
492b1c73b3
commit
81fbdff8fd
Notes:
git
2025-12-10 06:48:37 +00:00
@ -1494,6 +1494,9 @@ assert_equal "ok", %Q{
|
||||
unless a[i].equal?(b[i])
|
||||
raise [a[i], b[i]].inspect
|
||||
end
|
||||
unless a[i] == i.to_s
|
||||
raise [i, a[i], b[i]].inspect
|
||||
end
|
||||
end
|
||||
:ok
|
||||
}
|
||||
|
||||
173
concurrent_set.c
173
concurrent_set.c
@ -4,6 +4,9 @@
|
||||
#include "ruby/atomic.h"
|
||||
#include "vm_sync.h"
|
||||
|
||||
#define CONCURRENT_SET_CONTINUATION_BIT ((VALUE)1 << (sizeof(VALUE) * CHAR_BIT - 1))
|
||||
#define CONCURRENT_SET_HASH_MASK (~CONCURRENT_SET_CONTINUATION_BIT)
|
||||
|
||||
enum concurrent_set_special_values {
|
||||
CONCURRENT_SET_EMPTY,
|
||||
CONCURRENT_SET_DELETED,
|
||||
@ -24,6 +27,36 @@ struct concurrent_set {
|
||||
struct concurrent_set_entry *entries;
|
||||
};
|
||||
|
||||
static void
|
||||
concurrent_set_mark_continuation(struct concurrent_set_entry *entry, VALUE curr_hash_and_flags)
|
||||
{
|
||||
if (curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT) return;
|
||||
|
||||
RUBY_ASSERT((curr_hash_and_flags & CONCURRENT_SET_HASH_MASK) != 0);
|
||||
|
||||
VALUE new_hash = curr_hash_and_flags | CONCURRENT_SET_CONTINUATION_BIT;
|
||||
VALUE prev_hash = rbimpl_atomic_value_cas(&entry->hash, curr_hash_and_flags, new_hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
|
||||
// At the moment we only expect to be racing concurrently against another
|
||||
// thread also setting the continuation bit.
|
||||
// In the future if deletion is concurrent this will need adjusting
|
||||
RUBY_ASSERT(prev_hash == curr_hash_and_flags || prev_hash == new_hash);
|
||||
(void)prev_hash;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
concurrent_set_hash(const struct concurrent_set *set, VALUE key)
|
||||
{
|
||||
VALUE hash = set->funcs->hash(key);
|
||||
hash &= CONCURRENT_SET_HASH_MASK;
|
||||
if (hash == 0) {
|
||||
hash ^= CONCURRENT_SET_HASH_MASK;
|
||||
}
|
||||
RUBY_ASSERT(hash != 0);
|
||||
RUBY_ASSERT(!(hash & CONCURRENT_SET_CONTINUATION_BIT));
|
||||
return hash;
|
||||
}
|
||||
|
||||
static void
|
||||
concurrent_set_free(void *ptr)
|
||||
{
|
||||
@ -141,13 +174,9 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
|
||||
if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue;
|
||||
if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue;
|
||||
|
||||
VALUE hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
|
||||
if (hash == 0) {
|
||||
// Either in-progress insert or extremely unlikely 0 hash.
|
||||
// Re-calculate the hash.
|
||||
hash = old_set->funcs->hash(key);
|
||||
}
|
||||
RUBY_ASSERT(hash == old_set->funcs->hash(key));
|
||||
VALUE hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED) & CONCURRENT_SET_HASH_MASK;
|
||||
RUBY_ASSERT(hash != 0);
|
||||
RUBY_ASSERT(hash == concurrent_set_hash(old_set, key));
|
||||
|
||||
// Insert key into new_set.
|
||||
struct concurrent_set_probe probe;
|
||||
@ -156,20 +185,19 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
|
||||
while (true) {
|
||||
struct concurrent_set_entry *entry = &new_set->entries[idx];
|
||||
|
||||
if (entry->key == CONCURRENT_SET_EMPTY) {
|
||||
new_set->size++;
|
||||
if (entry->hash == CONCURRENT_SET_EMPTY) {
|
||||
RUBY_ASSERT(entry->key == CONCURRENT_SET_EMPTY);
|
||||
|
||||
new_set->size++;
|
||||
RUBY_ASSERT(new_set->size <= new_set->capacity / 2);
|
||||
RUBY_ASSERT(entry->hash == 0);
|
||||
|
||||
entry->key = key;
|
||||
entry->hash = hash;
|
||||
break;
|
||||
}
|
||||
else {
|
||||
RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
|
||||
}
|
||||
|
||||
RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
|
||||
entry->hash |= CONCURRENT_SET_CONTINUATION_BIT;
|
||||
idx = concurrent_set_probe_next(&probe);
|
||||
}
|
||||
}
|
||||
@ -203,20 +231,37 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
|
||||
if (hash == 0) {
|
||||
// We don't need to recompute the hash on every retry because it should
|
||||
// never change.
|
||||
hash = set->funcs->hash(key);
|
||||
hash = concurrent_set_hash(set, key);
|
||||
}
|
||||
RUBY_ASSERT(hash == set->funcs->hash(key));
|
||||
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
|
||||
|
||||
struct concurrent_set_probe probe;
|
||||
int idx = concurrent_set_probe_start(&probe, set, hash);
|
||||
|
||||
while (true) {
|
||||
struct concurrent_set_entry *entry = &set->entries[idx];
|
||||
VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
|
||||
VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
|
||||
bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
|
||||
|
||||
if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (curr_hash != hash) {
|
||||
if (!continuation) {
|
||||
return 0;
|
||||
}
|
||||
idx = concurrent_set_probe_next(&probe);
|
||||
continue;
|
||||
}
|
||||
|
||||
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
|
||||
|
||||
switch (curr_key) {
|
||||
case CONCURRENT_SET_EMPTY:
|
||||
return 0;
|
||||
// In-progress insert: hash written but key not yet
|
||||
break;
|
||||
case CONCURRENT_SET_DELETED:
|
||||
break;
|
||||
case CONCURRENT_SET_MOVED:
|
||||
@ -225,13 +270,9 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
|
||||
|
||||
goto retry;
|
||||
default: {
|
||||
VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
|
||||
if (curr_hash != 0 && curr_hash != hash) break;
|
||||
|
||||
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
|
||||
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
|
||||
// Skip it and mark it as deleted.
|
||||
rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
// Skip it and let the GC pass clean it up
|
||||
break;
|
||||
}
|
||||
|
||||
@ -241,6 +282,10 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
|
||||
return curr_key;
|
||||
}
|
||||
|
||||
if (!continuation) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -266,23 +311,49 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
if (hash == 0) {
|
||||
// We don't need to recompute the hash on every retry because it should
|
||||
// never change.
|
||||
hash = set->funcs->hash(key);
|
||||
hash = concurrent_set_hash(set, key);
|
||||
}
|
||||
RUBY_ASSERT(hash == set->funcs->hash(key));
|
||||
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
|
||||
|
||||
struct concurrent_set_probe probe;
|
||||
int idx = concurrent_set_probe_start(&probe, set, hash);
|
||||
|
||||
while (true) {
|
||||
struct concurrent_set_entry *entry = &set->entries[idx];
|
||||
VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
|
||||
VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
|
||||
|
||||
if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
|
||||
if (!inserting) {
|
||||
key = set->funcs->create(key, data);
|
||||
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
|
||||
inserting = true;
|
||||
}
|
||||
|
||||
// Reserve this slot for our hash value
|
||||
curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) {
|
||||
// Lost race, retry same slot to check winner's hash
|
||||
continue;
|
||||
}
|
||||
|
||||
// CAS succeeded, so these are the values stored
|
||||
curr_hash_and_flags = hash;
|
||||
curr_hash = hash;
|
||||
// Fall through to try to claim key
|
||||
}
|
||||
|
||||
if (curr_hash != hash) {
|
||||
goto probe_next;
|
||||
}
|
||||
|
||||
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
|
||||
|
||||
switch (curr_key) {
|
||||
case CONCURRENT_SET_EMPTY: {
|
||||
// Not in set
|
||||
case CONCURRENT_SET_EMPTY:
|
||||
if (!inserting) {
|
||||
key = set->funcs->create(key, data);
|
||||
RUBY_ASSERT(hash == set->funcs->hash(key));
|
||||
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
|
||||
inserting = true;
|
||||
}
|
||||
|
||||
@ -293,14 +364,11 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
|
||||
if (UNLIKELY(load_factor_reached)) {
|
||||
concurrent_set_try_resize(set_obj, set_obj_ptr);
|
||||
|
||||
goto retry;
|
||||
}
|
||||
|
||||
curr_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
if (curr_key == CONCURRENT_SET_EMPTY) {
|
||||
rbimpl_atomic_value_store(&entry->hash, hash, RBIMPL_ATOMIC_RELAXED);
|
||||
|
||||
VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
if (prev_key == CONCURRENT_SET_EMPTY) {
|
||||
RB_GC_GUARD(set_obj);
|
||||
return key;
|
||||
}
|
||||
@ -311,22 +379,16 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
// Another thread won the race, try again at the same location.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
case CONCURRENT_SET_DELETED:
|
||||
break;
|
||||
case CONCURRENT_SET_MOVED:
|
||||
// Wait
|
||||
RB_VM_LOCKING();
|
||||
|
||||
goto retry;
|
||||
default: {
|
||||
VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
|
||||
if (curr_hash != 0 && curr_hash != hash) break;
|
||||
|
||||
default:
|
||||
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
|
||||
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
|
||||
// Skip it and mark it as deleted.
|
||||
rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
// Skip it and let the GC pass clean it up
|
||||
break;
|
||||
}
|
||||
|
||||
@ -343,15 +405,33 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
|
||||
return curr_key;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
probe_next:
|
||||
RUBY_ASSERT(curr_hash_and_flags != CONCURRENT_SET_EMPTY);
|
||||
concurrent_set_mark_continuation(entry, curr_hash_and_flags);
|
||||
idx = concurrent_set_probe_next(&probe);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
concurrent_set_delete_entry_locked(struct concurrent_set *set, struct concurrent_set_entry *entry)
|
||||
{
|
||||
ASSERT_vm_locking_with_barrier();
|
||||
|
||||
if (entry->hash & CONCURRENT_SET_CONTINUATION_BIT) {
|
||||
entry->hash = CONCURRENT_SET_CONTINUATION_BIT;
|
||||
entry->key = CONCURRENT_SET_DELETED;
|
||||
set->deleted_entries++;
|
||||
}
|
||||
else {
|
||||
entry->hash = CONCURRENT_SET_EMPTY;
|
||||
entry->key = CONCURRENT_SET_EMPTY;
|
||||
set->size--;
|
||||
}
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
|
||||
{
|
||||
@ -359,7 +439,7 @@ rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
|
||||
|
||||
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
|
||||
|
||||
VALUE hash = set->funcs->hash(key);
|
||||
VALUE hash = concurrent_set_hash(set, key);
|
||||
|
||||
struct concurrent_set_probe probe;
|
||||
int idx = concurrent_set_probe_start(&probe, set, hash);
|
||||
@ -379,8 +459,8 @@ rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
|
||||
break;
|
||||
default:
|
||||
if (key == curr_key) {
|
||||
entry->key = CONCURRENT_SET_DELETED;
|
||||
set->deleted_entries++;
|
||||
RUBY_ASSERT((entry->hash & CONCURRENT_SET_HASH_MASK) == hash);
|
||||
concurrent_set_delete_entry_locked(set, entry);
|
||||
return curr_key;
|
||||
}
|
||||
break;
|
||||
@ -399,7 +479,7 @@ rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key
|
||||
|
||||
for (unsigned int i = 0; i < set->capacity; i++) {
|
||||
struct concurrent_set_entry *entry = &set->entries[i];
|
||||
VALUE key = set->entries[i].key;
|
||||
VALUE key = entry->key;
|
||||
|
||||
switch (key) {
|
||||
case CONCURRENT_SET_EMPTY:
|
||||
@ -414,8 +494,7 @@ rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key
|
||||
case ST_STOP:
|
||||
return;
|
||||
case ST_DELETE:
|
||||
set->entries[i].key = CONCURRENT_SET_DELETED;
|
||||
set->deleted_entries++;
|
||||
concurrent_set_delete_entry_locked(set, entry);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user