diff --git a/concurrent_set.c b/concurrent_set.c index eebf7df9cb..376b20d7d4 100644 --- a/concurrent_set.c +++ b/concurrent_set.c @@ -222,11 +222,14 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key) VALUE set_obj; VALUE hash = 0; + struct concurrent_set *set; + struct concurrent_set_probe probe; + int idx; retry: set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE); RUBY_ASSERT(set_obj); - struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); + set = RTYPEDDATA_GET_DATA(set_obj); if (hash == 0) { // We don't need to recompute the hash on every retry because it should @@ -235,8 +238,7 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key) } RUBY_ASSERT(hash == concurrent_set_hash(set, key)); - struct concurrent_set_probe probe; - int idx = concurrent_set_probe_start(&probe, set, hash); + idx = concurrent_set_probe_start(&probe, set, hash); while (true) { struct concurrent_set_entry *entry = &set->entries[idx]; @@ -299,37 +301,43 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) { RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT); - bool inserting = false; - VALUE set_obj; - VALUE hash = 0; - - retry: - set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE); - RUBY_ASSERT(set_obj); - struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); - - if (hash == 0) { - // We don't need to recompute the hash on every retry because it should - // never change. - hash = concurrent_set_hash(set, key); + // First attempt to find + { + VALUE result = rb_concurrent_set_find(set_obj_ptr, key); + if (result) return result; } - RUBY_ASSERT(hash == concurrent_set_hash(set, key)); + + // First time we need to call create, and store the hash + VALUE set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE); + RUBY_ASSERT(set_obj); + + struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); + key = set->funcs->create(key, data); + VALUE hash = concurrent_set_hash(set, key); struct concurrent_set_probe probe; - int idx = concurrent_set_probe_start(&probe, set, hash); + int idx; + + goto start_search; + +retry: + // On retries we only need to load the hash object + set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE); + RUBY_ASSERT(set_obj); + set = RTYPEDDATA_GET_DATA(set_obj); + + RUBY_ASSERT(hash == concurrent_set_hash(set, key)); + +start_search: + idx = concurrent_set_probe_start(&probe, set, hash); while (true) { struct concurrent_set_entry *entry = &set->entries[idx]; VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE); VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK; + bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT; if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) { - if (!inserting) { - key = set->funcs->create(key, data); - RUBY_ASSERT(hash == concurrent_set_hash(set, key)); - inserting = true; - } - // Reserve this slot for our hash value curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) { @@ -340,6 +348,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) // CAS succeeded, so these are the values stored curr_hash_and_flags = hash; curr_hash = hash; + // Fall through to try to claim key } @@ -350,13 +359,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE); switch (curr_key) { - case CONCURRENT_SET_EMPTY: - if (!inserting) { - key = set->funcs->create(key, data); - RUBY_ASSERT(hash == concurrent_set_hash(set, key)); - inserting = true; - } - + case CONCURRENT_SET_EMPTY: { rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED); // Load_factor reached at 75% full. ex: prev_size: 32, capacity: 64, load_factor: 50%. @@ -369,6 +372,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); if (prev_key == CONCURRENT_SET_EMPTY) { + RUBY_ASSERT(rb_concurrent_set_find(set_obj_ptr, key) == key); RB_GC_GUARD(set_obj); return key; } @@ -379,6 +383,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) // Another thread won the race, try again at the same location. continue; } + } case CONCURRENT_SET_DELETED: break; case CONCURRENT_SET_MOVED: @@ -386,22 +391,26 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) RB_VM_LOCKING(); goto retry; default: + // We're never GC during our search + // If the continuation bit wasn't set at the start of our search, + // any concurrent find with the same hash value would also look at + // this location and try to swap curr_key if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) { - // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object. - // Skip it and let the GC pass clean it up - break; + if (continuation) { + goto probe_next; + } + rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_EMPTY, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + continue; } if (set->funcs->cmp(key, curr_key)) { - // We've found a match. + // We've found a live match. RB_GC_GUARD(set_obj); - if (inserting) { - // We created key using set->funcs->create, but we didn't end - // up inserting it into the set. Free it here to prevent memory - // leaks. - if (set->funcs->free) set->funcs->free(key); - } + // We created key using set->funcs->create, but we didn't end + // up inserting it into the set. Free it here to prevent memory + // leaks. + if (set->funcs->free) set->funcs->free(key); return curr_key; }