mirror of
https://github.com/ruby/ruby.git
synced 2026-01-27 04:24:23 +00:00
Use explicit memory orders in concurrent_set
The atomic load/store operations here should mostly be using release/acquire semantics. This may lead to better performance than what we had under the default seq_cst. On x86 this may make the atomic store of hash faster, as it can avoid xchg. On ARM the loads may be faster (depending on target CPU for the compiler). Reference for comparison of atomic operations https://godbolt.org/z/6EdaMa5rG
This commit is contained in:
parent
9e4a756963
commit
45c016866c
Notes:
git
2025-10-16 01:22:21 +00:00
@ -1,7 +1,6 @@
|
||||
#include "internal.h"
|
||||
#include "internal/gc.h"
|
||||
#include "internal/concurrent_set.h"
|
||||
#include "ruby_atomic.h"
|
||||
#include "ruby/atomic.h"
|
||||
#include "vm_sync.h"
|
||||
|
||||
@ -109,7 +108,7 @@ static void
|
||||
concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
|
||||
{
|
||||
// Check if another thread has already resized.
|
||||
if (RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr) != old_set_obj) {
|
||||
if (rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE) != old_set_obj) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -117,7 +116,7 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
|
||||
|
||||
// This may overcount by up to the number of threads concurrently attempting to insert
|
||||
// GC may also happen between now and the set being rebuilt
|
||||
int expected_size = RUBY_ATOMIC_LOAD(old_set->size) - old_set->deleted_entries;
|
||||
int expected_size = rbimpl_atomic_load(&old_set->size, RBIMPL_ATOMIC_RELAXED) - old_set->deleted_entries;
|
||||
|
||||
struct concurrent_set_entry *old_entries = old_set->entries;
|
||||
int old_capacity = old_set->capacity;
|
||||
@ -135,13 +134,13 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
|
||||
|
||||
for (int i = 0; i < old_capacity; i++) {
|
||||
struct concurrent_set_entry *entry = &old_entries[i];
|
||||
VALUE key = RUBY_ATOMIC_VALUE_EXCHANGE(entry->key, CONCURRENT_SET_MOVED);
|
||||
VALUE key = rbimpl_atomic_value_exchange(&entry->key, CONCURRENT_SET_MOVED, RBIMPL_ATOMIC_ACQUIRE);
|
||||
RUBY_ASSERT(key != CONCURRENT_SET_MOVED);
|
||||
|
||||
if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue;
|
||||
if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue;
|
||||
|
||||
VALUE hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
|
||||
VALUE hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
|
||||
if (hash == 0) {
|
||||
// Either in-progress insert or extremely unlikely 0 hash.
|
||||
// Re-calculate the hash.
|
||||
@ -174,7 +173,7 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
|
||||
}
|
||||
}
|
||||
|
||||
RUBY_ATOMIC_VALUE_SET(*set_obj_ptr, new_set_obj);
|
||||
rbimpl_atomic_value_store(set_obj_ptr, new_set_obj, RBIMPL_ATOMIC_RELEASE);
|
||||
|
||||
RB_GC_GUARD(old_set_obj);
|
||||
}
|
||||
@ -196,7 +195,7 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
|
||||
VALUE hash = 0;
|
||||
|
||||
retry:
|
||||
set_obj = RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr);
|
||||
set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
|
||||
RUBY_ASSERT(set_obj);
|
||||
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
|
||||
|
||||
@ -212,7 +211,7 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
|
||||
|
||||
while (true) {
|
||||
struct concurrent_set_entry *entry = &set->entries[idx];
|
||||
VALUE curr_key = RUBY_ATOMIC_VALUE_LOAD(entry->key);
|
||||
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
|
||||
|
||||
switch (curr_key) {
|
||||
case CONCURRENT_SET_EMPTY:
|
||||
@ -225,13 +224,13 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
|
||||
|
||||
goto retry;
|
||||
default: {
|
||||
VALUE curr_hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
|
||||
VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
|
||||
if (curr_hash != 0 && curr_hash != hash) break;
|
||||
|
||||
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
|
||||
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
|
||||
// Skip it and mark it as deleted.
|
||||
RUBY_ATOMIC_VALUE_CAS(entry->key, curr_key, CONCURRENT_SET_DELETED);
|
||||
rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -259,7 +258,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
VALUE hash = 0;
|
||||
|
||||
retry:
|
||||
set_obj = RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr);
|
||||
set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
|
||||
RUBY_ASSERT(set_obj);
|
||||
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
|
||||
|
||||
@ -275,7 +274,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
|
||||
while (true) {
|
||||
struct concurrent_set_entry *entry = &set->entries[idx];
|
||||
VALUE curr_key = RUBY_ATOMIC_VALUE_LOAD(entry->key);
|
||||
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
|
||||
|
||||
switch (curr_key) {
|
||||
case CONCURRENT_SET_EMPTY: {
|
||||
@ -286,7 +285,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
inserting = true;
|
||||
}
|
||||
|
||||
rb_atomic_t prev_size = RUBY_ATOMIC_FETCH_ADD(set->size, 1);
|
||||
rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
|
||||
|
||||
if (UNLIKELY(prev_size > set->capacity / 2)) {
|
||||
concurrent_set_try_resize(set_obj, set_obj_ptr);
|
||||
@ -294,16 +293,16 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
goto retry;
|
||||
}
|
||||
|
||||
curr_key = RUBY_ATOMIC_VALUE_CAS(entry->key, CONCURRENT_SET_EMPTY, key);
|
||||
curr_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
if (curr_key == CONCURRENT_SET_EMPTY) {
|
||||
RUBY_ATOMIC_VALUE_SET(entry->hash, hash);
|
||||
rbimpl_atomic_value_store(&entry->hash, hash, RBIMPL_ATOMIC_RELAXED);
|
||||
|
||||
RB_GC_GUARD(set_obj);
|
||||
return key;
|
||||
}
|
||||
else {
|
||||
// Entry was not inserted.
|
||||
RUBY_ATOMIC_DEC(set->size);
|
||||
rbimpl_atomic_sub(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
|
||||
|
||||
// Another thread won the race, try again at the same location.
|
||||
continue;
|
||||
@ -317,13 +316,13 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
|
||||
|
||||
goto retry;
|
||||
default: {
|
||||
VALUE curr_hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
|
||||
VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
|
||||
if (curr_hash != 0 && curr_hash != hash) break;
|
||||
|
||||
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
|
||||
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
|
||||
// Skip it and mark it as deleted.
|
||||
RUBY_ATOMIC_VALUE_CAS(entry->key, curr_key, CONCURRENT_SET_DELETED);
|
||||
rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -363,7 +362,7 @@ rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
|
||||
|
||||
while (true) {
|
||||
struct concurrent_set_entry *entry = &set->entries[idx];
|
||||
VALUE curr_key = RUBY_ATOMIC_VALUE_LOAD(entry->key);
|
||||
VALUE curr_key = entry->key;
|
||||
|
||||
switch (curr_key) {
|
||||
case CONCURRENT_SET_EMPTY:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user