From 544770d566ae6b9cbdb79b24a555f10f60b3e5e3 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Tue, 30 Dec 2025 13:42:21 -0500 Subject: [PATCH] [ruby/mmtk] Process obj_free candidates in parallel This commit allows objects that are safe to be freed in parallel to do so. A decrease in object freeing time can be seen in profiles. The benchmarks don't show much difference. Before: -------------- -------------------- ---------- --------- bench sequential free (ms) stddev (%) RSS (MiB) activerecord 242.3 7.4 84.3 chunky-png 439.1 0.6 75.6 erubi-rails 1221.2 4.2 132.7 hexapdf 1544.8 1.8 429.1 liquid-c 42.7 7.4 48.5 liquid-compile 41.4 8.3 52.2 liquid-render 100.6 3.0 56.8 mail 108.9 2.1 65.1 psych-load 1536.9 0.6 43.4 railsbench 1633.5 2.6 146.2 rubocop 126.5 15.8 142.1 ruby-lsp 129.6 9.7 112.2 sequel 47.9 6.5 44.6 shipit 1152.0 2.7 315.2 -------------- -------------------- ---------- --------- After: -------------- ------------------ ---------- --------- bench parallel free (ms) stddev (%) RSS (MiB) activerecord 235.1 5.5 87.4 chunky-png 440.8 0.8 68.1 erubi-rails 1105.3 0.8 128.0 hexapdf 1578.3 4.1 405.1 liquid-c 42.6 7.1 48.4 liquid-compile 41.5 8.1 52.1 liquid-render 101.2 2.8 53.3 mail 109.7 2.7 64.8 psych-load 1567.7 1.1 44.4 railsbench 1644.9 1.9 150.9 rubocop 125.6 15.4 148.5 ruby-lsp 127.9 5.8 104.6 sequel 48.2 6.1 44.1 shipit 1215.3 4.7 320.5 -------------- ------------------ ---------- --------- https://github.com/ruby/mmtk/commit/4f0b5fd2eb --- gc/mmtk/mmtk.c | 23 ++++++++- gc/mmtk/mmtk.h | 2 +- gc/mmtk/src/api.rs | 11 ++-- gc/mmtk/src/weak_proc.rs | 106 ++++++++++++++++++++++++++++++--------- 4 files changed, 114 insertions(+), 28 deletions(-) diff --git a/gc/mmtk/mmtk.c b/gc/mmtk/mmtk.c index b532d3a774..f61130b4f6 100644 --- a/gc/mmtk/mmtk.c +++ b/gc/mmtk/mmtk.c @@ -696,6 +696,27 @@ rb_mmtk_alloc_fast_path(struct objspace *objspace, struct MMTk_ractor_cache *rac } } +static bool +obj_can_parallel_free_p(VALUE obj) +{ + switch (RB_BUILTIN_TYPE(obj)) { + case T_ARRAY: + case T_BIGNUM: + case T_COMPLEX: + case T_FLOAT: + case T_HASH: + case T_OBJECT: + case T_RATIONAL: + case T_REGEXP: + case T_STRING: + case T_STRUCT: + case T_SYMBOL: + return true; + default: + return false; + } +} + VALUE rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags, bool wb_protected, size_t alloc_size) { @@ -732,7 +753,7 @@ rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags mmtk_post_alloc(ractor_cache->mutator, (void*)alloc_obj, alloc_size, MMTK_ALLOCATION_SEMANTICS_DEFAULT); // TODO: only add when object needs obj_free to be called - mmtk_add_obj_free_candidate(alloc_obj); + mmtk_add_obj_free_candidate(alloc_obj, obj_can_parallel_free_p((VALUE)alloc_obj)); objspace->total_allocated_objects++; diff --git a/gc/mmtk/mmtk.h b/gc/mmtk/mmtk.h index f7da0f95f0..51798c4240 100644 --- a/gc/mmtk/mmtk.h +++ b/gc/mmtk/mmtk.h @@ -122,7 +122,7 @@ void mmtk_post_alloc(MMTk_Mutator *mutator, size_t bytes, MMTk_AllocationSemantics semantics); -void mmtk_add_obj_free_candidate(MMTk_ObjectReference object); +void mmtk_add_obj_free_candidate(MMTk_ObjectReference object, bool can_parallel_free); void mmtk_declare_weak_references(MMTk_ObjectReference object); diff --git a/gc/mmtk/src/api.rs b/gc/mmtk/src/api.rs index ec0e5bafe2..3515a2408b 100644 --- a/gc/mmtk/src/api.rs +++ b/gc/mmtk/src/api.rs @@ -198,7 +198,10 @@ pub unsafe extern "C" fn mmtk_init_binding( let mmtk_boxed = mmtk_init(&builder); let mmtk_static = Box::leak(Box::new(mmtk_boxed)); - let binding = RubyBinding::new(mmtk_static, &binding_options, upcalls); + let mut binding = RubyBinding::new(mmtk_static, &binding_options, upcalls); + binding + .weak_proc + .init_parallel_obj_free_candidates(memory_manager::num_of_workers(binding.mmtk)); crate::BINDING .set(binding) @@ -296,8 +299,10 @@ pub unsafe extern "C" fn mmtk_post_alloc( // TODO: Replace with buffered mmtk_add_obj_free_candidates #[no_mangle] -pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference) { - binding().weak_proc.add_obj_free_candidate(object) +pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference, can_parallel_free: bool) { + binding() + .weak_proc + .add_obj_free_candidate(object, can_parallel_free) } // =============== Weak references =============== diff --git a/gc/mmtk/src/weak_proc.rs b/gc/mmtk/src/weak_proc.rs index 3184c4f6d1..8bb8262544 100644 --- a/gc/mmtk/src/weak_proc.rs +++ b/gc/mmtk/src/weak_proc.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Mutex; use mmtk::{ @@ -9,10 +11,13 @@ use mmtk::{ use crate::{abi::GCThreadTLS, upcalls, Ruby}; pub struct WeakProcessor { + non_parallel_obj_free_candidates: Mutex>, + parallel_obj_free_candidates: Vec>>, + parallel_obj_free_candidates_counter: AtomicUsize, + /// Objects that needs `obj_free` called when dying. /// If it is a bottleneck, replace it with a lock-free data structure, /// or add candidates in batch. - obj_free_candidates: Mutex>, weak_references: Mutex>, } @@ -25,32 +30,59 @@ impl Default for WeakProcessor { impl WeakProcessor { pub fn new() -> Self { Self { - obj_free_candidates: Mutex::new(Vec::new()), + non_parallel_obj_free_candidates: Mutex::new(Vec::new()), + parallel_obj_free_candidates: vec![Mutex::new(Vec::new())], + parallel_obj_free_candidates_counter: AtomicUsize::new(0), weak_references: Mutex::new(Vec::new()), } } + pub fn init_parallel_obj_free_candidates(&mut self, num_workers: usize) { + debug_assert_eq!(self.parallel_obj_free_candidates.len(), 1); + + for _ in 1..num_workers { + self.parallel_obj_free_candidates + .push(Mutex::new(Vec::new())); + } + } + /// Add an object as a candidate for `obj_free`. /// /// Multiple mutators can call it concurrently, so it has `&self`. - pub fn add_obj_free_candidate(&self, object: ObjectReference) { - let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap(); - obj_free_candidates.push(object); - } + pub fn add_obj_free_candidate(&self, object: ObjectReference, can_parallel_free: bool) { + if can_parallel_free { + // Newly allocated objects are placed in parallel_obj_free_candidates using + // round-robin. This may not be ideal for load balancing. + let idx = self + .parallel_obj_free_candidates_counter + .fetch_add(1, Ordering::Relaxed) + % self.parallel_obj_free_candidates.len(); - /// Add many objects as candidates for `obj_free`. - /// - /// Multiple mutators can call it concurrently, so it has `&self`. - pub fn add_obj_free_candidates(&self, objects: &[ObjectReference]) { - let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap(); - for object in objects.iter().copied() { - obj_free_candidates.push(object); + self.parallel_obj_free_candidates[idx] + .lock() + .unwrap() + .push(object); + } else { + self.non_parallel_obj_free_candidates + .lock() + .unwrap() + .push(object); } } pub fn get_all_obj_free_candidates(&self) -> Vec { - let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap(); - std::mem::take(obj_free_candidates.as_mut()) + // let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap(); + let mut all_obj_free_candidates = self + .non_parallel_obj_free_candidates + .lock() + .unwrap() + .to_vec(); + + for candidates_mutex in &self.parallel_obj_free_candidates { + all_obj_free_candidates.extend(candidates_mutex.lock().unwrap().to_vec()); + } + + std::mem::take(all_obj_free_candidates.as_mut()) } pub fn add_weak_reference(&self, object: ObjectReference) { @@ -63,7 +95,22 @@ impl WeakProcessor { worker: &mut GCWorker, _tracer_context: impl ObjectTracerContext, ) { - worker.add_work(WorkBucketStage::VMRefClosure, ProcessObjFreeCandidates); + worker.add_work( + WorkBucketStage::VMRefClosure, + ProcessObjFreeCandidates { + process_type: ProcessObjFreeCandidatesType::NonParallel, + }, + ); + + for i in 0..self.parallel_obj_free_candidates.len() { + worker.add_work( + WorkBucketStage::VMRefClosure, + ProcessObjFreeCandidates { + process_type: ProcessObjFreeCandidatesType::Parallel(i), + }, + ); + } + worker.add_work(WorkBucketStage::VMRefClosure, ProcessWeakReferences); worker.add_work(WorkBucketStage::Prepare, UpdateFinalizerObjIdTables); @@ -80,16 +127,29 @@ impl WeakProcessor { } } -struct ProcessObjFreeCandidates; +enum ProcessObjFreeCandidatesType { + NonParallel, + Parallel(usize), +} + +struct ProcessObjFreeCandidates { + process_type: ProcessObjFreeCandidatesType, +} impl GCWork for ProcessObjFreeCandidates { fn do_work(&mut self, _worker: &mut GCWorker, _mmtk: &'static mmtk::MMTK) { - // If it blocks, it is a bug. - let mut obj_free_candidates = crate::binding() - .weak_proc - .obj_free_candidates - .try_lock() - .expect("It's GC time. No mutators should hold this lock at this time."); + let mut obj_free_candidates = match self.process_type { + ProcessObjFreeCandidatesType::NonParallel => crate::binding() + .weak_proc + .non_parallel_obj_free_candidates + .try_lock() + .expect("Lock for non_parallel_obj_free_candidates should not be held"), + ProcessObjFreeCandidatesType::Parallel(idx) => { + crate::binding().weak_proc.parallel_obj_free_candidates[idx] + .try_lock() + .expect("Lock for parallel_obj_free_candidates should not be held") + } + }; let n_cands = obj_free_candidates.len();