[ruby/mmtk] Process obj_free candidates in parallel

This commit allows objects that are safe to be freed in parallel to do so.
A decrease in object freeing time can be seen in profiles.

The benchmarks don't show much difference.

Before:

--------------  --------------------  ----------  ---------
bench           sequential free (ms)  stddev (%)  RSS (MiB)
activerecord    242.3                 7.4         84.3
chunky-png      439.1                 0.6         75.6
erubi-rails     1221.2                4.2         132.7
hexapdf         1544.8                1.8         429.1
liquid-c        42.7                  7.4         48.5
liquid-compile  41.4                  8.3         52.2
liquid-render   100.6                 3.0         56.8
mail            108.9                 2.1         65.1
psych-load      1536.9                0.6         43.4
railsbench      1633.5                2.6         146.2
rubocop         126.5                 15.8        142.1
ruby-lsp        129.6                 9.7         112.2
sequel          47.9                  6.5         44.6
shipit          1152.0                2.7         315.2
--------------  --------------------  ----------  ---------

After:

--------------  ------------------  ----------  ---------
bench           parallel free (ms)  stddev (%)  RSS (MiB)
activerecord    235.1               5.5         87.4
chunky-png      440.8               0.8         68.1
erubi-rails     1105.3              0.8         128.0
hexapdf         1578.3              4.1         405.1
liquid-c        42.6                7.1         48.4
liquid-compile  41.5                8.1         52.1
liquid-render   101.2               2.8         53.3
mail            109.7               2.7         64.8
psych-load      1567.7              1.1         44.4
railsbench      1644.9              1.9         150.9
rubocop         125.6               15.4        148.5
ruby-lsp        127.9               5.8         104.6
sequel          48.2                6.1         44.1
shipit          1215.3              4.7         320.5
--------------  ------------------  ----------  ---------

https://github.com/ruby/mmtk/commit/4f0b5fd2eb
This commit is contained in:
Peter Zhu 2025-12-30 13:42:21 -05:00 committed by git
parent 99249cc582
commit 544770d566
4 changed files with 114 additions and 28 deletions

View File

@ -696,6 +696,27 @@ rb_mmtk_alloc_fast_path(struct objspace *objspace, struct MMTk_ractor_cache *rac
}
}
static bool
obj_can_parallel_free_p(VALUE obj)
{
switch (RB_BUILTIN_TYPE(obj)) {
case T_ARRAY:
case T_BIGNUM:
case T_COMPLEX:
case T_FLOAT:
case T_HASH:
case T_OBJECT:
case T_RATIONAL:
case T_REGEXP:
case T_STRING:
case T_STRUCT:
case T_SYMBOL:
return true;
default:
return false;
}
}
VALUE
rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags, bool wb_protected, size_t alloc_size)
{
@ -732,7 +753,7 @@ rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags
mmtk_post_alloc(ractor_cache->mutator, (void*)alloc_obj, alloc_size, MMTK_ALLOCATION_SEMANTICS_DEFAULT);
// TODO: only add when object needs obj_free to be called
mmtk_add_obj_free_candidate(alloc_obj);
mmtk_add_obj_free_candidate(alloc_obj, obj_can_parallel_free_p((VALUE)alloc_obj));
objspace->total_allocated_objects++;

View File

@ -122,7 +122,7 @@ void mmtk_post_alloc(MMTk_Mutator *mutator,
size_t bytes,
MMTk_AllocationSemantics semantics);
void mmtk_add_obj_free_candidate(MMTk_ObjectReference object);
void mmtk_add_obj_free_candidate(MMTk_ObjectReference object, bool can_parallel_free);
void mmtk_declare_weak_references(MMTk_ObjectReference object);

View File

@ -198,7 +198,10 @@ pub unsafe extern "C" fn mmtk_init_binding(
let mmtk_boxed = mmtk_init(&builder);
let mmtk_static = Box::leak(Box::new(mmtk_boxed));
let binding = RubyBinding::new(mmtk_static, &binding_options, upcalls);
let mut binding = RubyBinding::new(mmtk_static, &binding_options, upcalls);
binding
.weak_proc
.init_parallel_obj_free_candidates(memory_manager::num_of_workers(binding.mmtk));
crate::BINDING
.set(binding)
@ -296,8 +299,10 @@ pub unsafe extern "C" fn mmtk_post_alloc(
// TODO: Replace with buffered mmtk_add_obj_free_candidates
#[no_mangle]
pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference) {
binding().weak_proc.add_obj_free_candidate(object)
pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference, can_parallel_free: bool) {
binding()
.weak_proc
.add_obj_free_candidate(object, can_parallel_free)
}
// =============== Weak references ===============

View File

@ -1,3 +1,5 @@
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use mmtk::{
@ -9,10 +11,13 @@ use mmtk::{
use crate::{abi::GCThreadTLS, upcalls, Ruby};
pub struct WeakProcessor {
non_parallel_obj_free_candidates: Mutex<Vec<ObjectReference>>,
parallel_obj_free_candidates: Vec<Mutex<Vec<ObjectReference>>>,
parallel_obj_free_candidates_counter: AtomicUsize,
/// Objects that needs `obj_free` called when dying.
/// If it is a bottleneck, replace it with a lock-free data structure,
/// or add candidates in batch.
obj_free_candidates: Mutex<Vec<ObjectReference>>,
weak_references: Mutex<Vec<ObjectReference>>,
}
@ -25,32 +30,59 @@ impl Default for WeakProcessor {
impl WeakProcessor {
pub fn new() -> Self {
Self {
obj_free_candidates: Mutex::new(Vec::new()),
non_parallel_obj_free_candidates: Mutex::new(Vec::new()),
parallel_obj_free_candidates: vec![Mutex::new(Vec::new())],
parallel_obj_free_candidates_counter: AtomicUsize::new(0),
weak_references: Mutex::new(Vec::new()),
}
}
pub fn init_parallel_obj_free_candidates(&mut self, num_workers: usize) {
debug_assert_eq!(self.parallel_obj_free_candidates.len(), 1);
for _ in 1..num_workers {
self.parallel_obj_free_candidates
.push(Mutex::new(Vec::new()));
}
}
/// Add an object as a candidate for `obj_free`.
///
/// Multiple mutators can call it concurrently, so it has `&self`.
pub fn add_obj_free_candidate(&self, object: ObjectReference) {
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
obj_free_candidates.push(object);
}
pub fn add_obj_free_candidate(&self, object: ObjectReference, can_parallel_free: bool) {
if can_parallel_free {
// Newly allocated objects are placed in parallel_obj_free_candidates using
// round-robin. This may not be ideal for load balancing.
let idx = self
.parallel_obj_free_candidates_counter
.fetch_add(1, Ordering::Relaxed)
% self.parallel_obj_free_candidates.len();
/// Add many objects as candidates for `obj_free`.
///
/// Multiple mutators can call it concurrently, so it has `&self`.
pub fn add_obj_free_candidates(&self, objects: &[ObjectReference]) {
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
for object in objects.iter().copied() {
obj_free_candidates.push(object);
self.parallel_obj_free_candidates[idx]
.lock()
.unwrap()
.push(object);
} else {
self.non_parallel_obj_free_candidates
.lock()
.unwrap()
.push(object);
}
}
pub fn get_all_obj_free_candidates(&self) -> Vec<ObjectReference> {
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
std::mem::take(obj_free_candidates.as_mut())
// let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
let mut all_obj_free_candidates = self
.non_parallel_obj_free_candidates
.lock()
.unwrap()
.to_vec();
for candidates_mutex in &self.parallel_obj_free_candidates {
all_obj_free_candidates.extend(candidates_mutex.lock().unwrap().to_vec());
}
std::mem::take(all_obj_free_candidates.as_mut())
}
pub fn add_weak_reference(&self, object: ObjectReference) {
@ -63,7 +95,22 @@ impl WeakProcessor {
worker: &mut GCWorker<Ruby>,
_tracer_context: impl ObjectTracerContext<Ruby>,
) {
worker.add_work(WorkBucketStage::VMRefClosure, ProcessObjFreeCandidates);
worker.add_work(
WorkBucketStage::VMRefClosure,
ProcessObjFreeCandidates {
process_type: ProcessObjFreeCandidatesType::NonParallel,
},
);
for i in 0..self.parallel_obj_free_candidates.len() {
worker.add_work(
WorkBucketStage::VMRefClosure,
ProcessObjFreeCandidates {
process_type: ProcessObjFreeCandidatesType::Parallel(i),
},
);
}
worker.add_work(WorkBucketStage::VMRefClosure, ProcessWeakReferences);
worker.add_work(WorkBucketStage::Prepare, UpdateFinalizerObjIdTables);
@ -80,16 +127,29 @@ impl WeakProcessor {
}
}
struct ProcessObjFreeCandidates;
enum ProcessObjFreeCandidatesType {
NonParallel,
Parallel(usize),
}
struct ProcessObjFreeCandidates {
process_type: ProcessObjFreeCandidatesType,
}
impl GCWork<Ruby> for ProcessObjFreeCandidates {
fn do_work(&mut self, _worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) {
// If it blocks, it is a bug.
let mut obj_free_candidates = crate::binding()
.weak_proc
.obj_free_candidates
.try_lock()
.expect("It's GC time. No mutators should hold this lock at this time.");
let mut obj_free_candidates = match self.process_type {
ProcessObjFreeCandidatesType::NonParallel => crate::binding()
.weak_proc
.non_parallel_obj_free_candidates
.try_lock()
.expect("Lock for non_parallel_obj_free_candidates should not be held"),
ProcessObjFreeCandidatesType::Parallel(idx) => {
crate::binding().weak_proc.parallel_obj_free_candidates[idx]
.try_lock()
.expect("Lock for parallel_obj_free_candidates should not be held")
}
};
let n_cands = obj_free_candidates.len();