mirror of
https://github.com/ruby/ruby.git
synced 2026-01-26 12:14:51 +00:00
Convert Queue and SizedQueue to rb builtin
A large part of `thread_sync.c` was migrated already, might as well go all the way. It also allow to remove a bunch of Rdoc commands.
This commit is contained in:
parent
e7695ba3d9
commit
16feb46fa2
Notes:
git
2026-01-02 23:08:11 +00:00
@ -2226,7 +2226,7 @@ CODE
|
||||
def test_thread_add_trace_func
|
||||
events = []
|
||||
base_line = __LINE__
|
||||
q = Thread::Queue.new
|
||||
q = []
|
||||
t = Thread.new{
|
||||
Thread.current.add_trace_func proc{|ev, file, line, *args|
|
||||
events << [ev, line] if file == __FILE__
|
||||
|
||||
515
thread_sync.c
515
thread_sync.c
@ -2,8 +2,7 @@
|
||||
#include "ccan/list/list.h"
|
||||
#include "builtin.h"
|
||||
|
||||
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
|
||||
static VALUE rb_eClosedQueueError;
|
||||
static VALUE rb_cMutex, rb_eClosedQueueError;
|
||||
|
||||
/* Mutex */
|
||||
typedef struct rb_mutex_struct {
|
||||
@ -83,30 +82,6 @@ static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
|
||||
#endif
|
||||
static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial);
|
||||
|
||||
/*
|
||||
* Document-class: Thread::Mutex
|
||||
*
|
||||
* Thread::Mutex implements a simple semaphore that can be used to
|
||||
* coordinate access to shared data from multiple concurrent threads.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* semaphore = Thread::Mutex.new
|
||||
*
|
||||
* a = Thread.new {
|
||||
* semaphore.synchronize {
|
||||
* # access shared resource
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* b = Thread.new {
|
||||
* semaphore.synchronize {
|
||||
* # access shared resource
|
||||
* }
|
||||
* }
|
||||
*
|
||||
*/
|
||||
|
||||
static size_t
|
||||
rb_mutex_num_waiting(rb_mutex_t *mutex)
|
||||
{
|
||||
@ -759,19 +734,19 @@ queue_alloc(VALUE klass)
|
||||
return obj;
|
||||
}
|
||||
|
||||
static int
|
||||
static inline bool
|
||||
queue_fork_check(struct rb_queue *q)
|
||||
{
|
||||
rb_serial_t fork_gen = GET_VM()->fork_gen;
|
||||
|
||||
if (q->fork_gen == fork_gen) {
|
||||
return 0;
|
||||
if (RB_LIKELY(q->fork_gen == fork_gen)) {
|
||||
return false;
|
||||
}
|
||||
/* forked children can't reach into parent thread stacks */
|
||||
q->fork_gen = fork_gen;
|
||||
ccan_list_head_init(&q->waitq);
|
||||
q->num_waiting = 0;
|
||||
return 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
static inline struct rb_queue *
|
||||
@ -870,7 +845,7 @@ raw_szqueue_ptr(VALUE obj)
|
||||
struct rb_szqueue *sq;
|
||||
|
||||
TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
|
||||
if (queue_fork_check(&sq->q)) {
|
||||
if (RB_UNLIKELY(queue_fork_check(&sq->q))) {
|
||||
ccan_list_head_init(szqueue_pushq(sq));
|
||||
sq->num_waiting_push = 0;
|
||||
}
|
||||
@ -886,7 +861,7 @@ szqueue_ptr(VALUE obj)
|
||||
return sq;
|
||||
}
|
||||
|
||||
static int
|
||||
static inline bool
|
||||
queue_closed_p(VALUE self)
|
||||
{
|
||||
return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
|
||||
@ -967,76 +942,16 @@ ring_buffer_shift(struct rb_queue *q)
|
||||
return obj;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-class: Thread::Queue
|
||||
*
|
||||
* The Thread::Queue class implements multi-producer, multi-consumer
|
||||
* queues. It is especially useful in threaded programming when
|
||||
* information must be exchanged safely between multiple threads. The
|
||||
* Thread::Queue class implements all the required locking semantics.
|
||||
*
|
||||
* The class implements FIFO (first in, first out) type of queue.
|
||||
* In a FIFO queue, the first tasks added are the first retrieved.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* queue = Thread::Queue.new
|
||||
*
|
||||
* producer = Thread.new do
|
||||
* 5.times do |i|
|
||||
* sleep rand(i) # simulate expense
|
||||
* queue << i
|
||||
* puts "#{i} produced"
|
||||
* end
|
||||
* end
|
||||
*
|
||||
* consumer = Thread.new do
|
||||
* 5.times do |i|
|
||||
* value = queue.pop
|
||||
* sleep rand(i/2) # simulate expense
|
||||
* puts "consumed #{value}"
|
||||
* end
|
||||
* end
|
||||
*
|
||||
* consumer.join
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* Document-method: Queue::new
|
||||
*
|
||||
* call-seq:
|
||||
* Thread::Queue.new -> empty_queue
|
||||
* Thread::Queue.new(enumerable) -> queue
|
||||
*
|
||||
* Creates a new queue instance, optionally using the contents of an +enumerable+
|
||||
* for its initial state.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* q = Thread::Queue.new
|
||||
* #=> #<Thread::Queue:0x00007ff7501110d0>
|
||||
* q.empty?
|
||||
* #=> true
|
||||
*
|
||||
* q = Thread::Queue.new([1, 2, 3])
|
||||
* #=> #<Thread::Queue:0x00007ff7500ec500>
|
||||
* q.empty?
|
||||
* #=> false
|
||||
* q.pop
|
||||
* #=> 1
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_initialize(int argc, VALUE *argv, VALUE self)
|
||||
queue_initialize(rb_execution_context_t *ec, VALUE self, VALUE initial)
|
||||
{
|
||||
VALUE initial;
|
||||
struct rb_queue *q = raw_queue_ptr(self);
|
||||
if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
|
||||
initial = rb_to_array(initial);
|
||||
}
|
||||
ccan_list_head_init(&q->waitq);
|
||||
if (argc == 1) {
|
||||
if (NIL_P(initial)) {
|
||||
ring_buffer_init(q, QUEUE_INITIAL_CAPA);
|
||||
}
|
||||
else {
|
||||
initial = rb_to_array(initial);
|
||||
long len = RARRAY_LEN(initial);
|
||||
long initial_capa = QUEUE_INITIAL_CAPA;
|
||||
while (initial_capa < len) {
|
||||
@ -1046,9 +961,6 @@ rb_queue_initialize(int argc, VALUE *argv, VALUE self)
|
||||
MEMCPY(q->buffer, RARRAY_CONST_PTR(initial), VALUE, len);
|
||||
q->len = len;
|
||||
}
|
||||
else {
|
||||
ring_buffer_init(q, QUEUE_INITIAL_CAPA);
|
||||
}
|
||||
return self;
|
||||
}
|
||||
|
||||
@ -1064,82 +976,6 @@ queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
|
||||
return self;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::Queue#close
|
||||
* call-seq:
|
||||
* close
|
||||
*
|
||||
* Closes the queue. A closed queue cannot be re-opened.
|
||||
*
|
||||
* After the call to close completes, the following are true:
|
||||
*
|
||||
* - +closed?+ will return true
|
||||
*
|
||||
* - +close+ will be ignored.
|
||||
*
|
||||
* - calling enq/push/<< will raise a +ClosedQueueError+.
|
||||
*
|
||||
* - when +empty?+ is false, calling deq/pop/shift will return an object
|
||||
* from the queue as usual.
|
||||
* - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
|
||||
* deq(true) will raise a +ThreadError+.
|
||||
*
|
||||
* ClosedQueueError is inherited from StopIteration, so that you can break loop block.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* q = Thread::Queue.new
|
||||
* Thread.new{
|
||||
* while e = q.deq # wait for nil to break loop
|
||||
* # ...
|
||||
* end
|
||||
* }
|
||||
* q.close
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_close(VALUE self)
|
||||
{
|
||||
struct rb_queue *q = queue_ptr(self);
|
||||
|
||||
if (!queue_closed_p(self)) {
|
||||
FL_SET(self, QUEUE_CLOSED);
|
||||
|
||||
wakeup_all(&q->waitq);
|
||||
}
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::Queue#closed?
|
||||
* call-seq: closed?
|
||||
*
|
||||
* Returns +true+ if the queue is closed.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_closed_p(VALUE self)
|
||||
{
|
||||
return RBOOL(queue_closed_p(self));
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::Queue#push
|
||||
* call-seq:
|
||||
* push(object)
|
||||
* enq(object)
|
||||
* <<(object)
|
||||
*
|
||||
* Pushes the given +object+ to the queue.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_push(VALUE self, VALUE obj)
|
||||
{
|
||||
return queue_do_push(self, queue_ptr(self), obj);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
queue_sleep(VALUE _args)
|
||||
{
|
||||
@ -1231,19 +1067,6 @@ rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE time
|
||||
return queue_do_pop(ec, self, queue_ptr(self), non_block, timeout);
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::Queue#empty?
|
||||
* call-seq: empty?
|
||||
*
|
||||
* Returns +true+ if the queue is empty.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_empty_p(VALUE self)
|
||||
{
|
||||
return RBOOL(queue_ptr(self)->len == 0);
|
||||
}
|
||||
|
||||
static void
|
||||
queue_clear(struct rb_queue *q)
|
||||
{
|
||||
@ -1251,87 +1074,12 @@ queue_clear(struct rb_queue *q)
|
||||
q->offset = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::Queue#clear
|
||||
*
|
||||
* Removes all objects from the queue.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_clear(VALUE self)
|
||||
szqueue_initialize(rb_execution_context_t *ec, VALUE self, VALUE vmax)
|
||||
{
|
||||
queue_clear(queue_ptr(self));
|
||||
return self;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::Queue#length
|
||||
* call-seq:
|
||||
* length
|
||||
* size
|
||||
*
|
||||
* Returns the length of the queue.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_length(VALUE self)
|
||||
{
|
||||
return LONG2NUM(queue_ptr(self)->len);
|
||||
}
|
||||
|
||||
NORETURN(static VALUE rb_queue_freeze(VALUE self));
|
||||
/*
|
||||
* call-seq:
|
||||
* freeze
|
||||
*
|
||||
* The queue can't be frozen, so this method raises an exception:
|
||||
* Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
|
||||
*
|
||||
*/
|
||||
static VALUE
|
||||
rb_queue_freeze(VALUE self)
|
||||
{
|
||||
rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
|
||||
UNREACHABLE_RETURN(self);
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::Queue#num_waiting
|
||||
*
|
||||
* Returns the number of threads waiting on the queue.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_num_waiting(VALUE self)
|
||||
{
|
||||
struct rb_queue *q = queue_ptr(self);
|
||||
|
||||
return INT2NUM(q->num_waiting);
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-class: Thread::SizedQueue
|
||||
*
|
||||
* This class represents queues of specified size capacity. The push operation
|
||||
* may be blocked if the capacity is full.
|
||||
*
|
||||
* See Thread::Queue for an example of how a Thread::SizedQueue works.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Document-method: SizedQueue::new
|
||||
* call-seq: new(max)
|
||||
*
|
||||
* Creates a fixed-length queue with a maximum size of +max+.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_szqueue_initialize(VALUE self, VALUE vmax)
|
||||
{
|
||||
long max;
|
||||
long max = NUM2LONG(vmax);
|
||||
struct rb_szqueue *sq = raw_szqueue_ptr(self);
|
||||
|
||||
max = NUM2LONG(vmax);
|
||||
if (max <= 0) {
|
||||
rb_raise(rb_eArgError, "queue size must be positive");
|
||||
}
|
||||
@ -1343,68 +1091,6 @@ rb_szqueue_initialize(VALUE self, VALUE vmax)
|
||||
return self;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::SizedQueue#close
|
||||
* call-seq:
|
||||
* close
|
||||
*
|
||||
* Similar to Thread::Queue#close.
|
||||
*
|
||||
* The difference is behavior with waiting enqueuing threads.
|
||||
*
|
||||
* If there are waiting enqueuing threads, they are interrupted by
|
||||
* raising ClosedQueueError('queue closed').
|
||||
*/
|
||||
static VALUE
|
||||
rb_szqueue_close(VALUE self)
|
||||
{
|
||||
if (!queue_closed_p(self)) {
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
|
||||
FL_SET(self, QUEUE_CLOSED);
|
||||
wakeup_all(szqueue_waitq(sq));
|
||||
wakeup_all(szqueue_pushq(sq));
|
||||
}
|
||||
return self;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::SizedQueue#max
|
||||
*
|
||||
* Returns the maximum size of the queue.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_szqueue_max_get(VALUE self)
|
||||
{
|
||||
return LONG2NUM(szqueue_ptr(self)->max);
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::SizedQueue#max=
|
||||
* call-seq: max=(number)
|
||||
*
|
||||
* Sets the maximum size of the queue to the given +number+.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_szqueue_max_set(VALUE self, VALUE vmax)
|
||||
{
|
||||
long max = NUM2LONG(vmax);
|
||||
long diff = 0;
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
|
||||
if (max <= 0) {
|
||||
rb_raise(rb_eArgError, "queue size must be positive");
|
||||
}
|
||||
if (max > sq->max) {
|
||||
diff = max - sq->max;
|
||||
}
|
||||
sq->max = max;
|
||||
sync_wakeup(szqueue_pushq(sq), diff);
|
||||
return vmax;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
|
||||
{
|
||||
@ -1464,124 +1150,12 @@ rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE ti
|
||||
return retval;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::SizedQueue#clear
|
||||
*
|
||||
* Removes all objects from the queue.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_szqueue_clear(VALUE self)
|
||||
{
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
queue_clear(&sq->q);
|
||||
wakeup_all(szqueue_pushq(sq));
|
||||
return self;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::SizedQueue#num_waiting
|
||||
*
|
||||
* Returns the number of threads waiting on the queue.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_szqueue_num_waiting(VALUE self)
|
||||
{
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
|
||||
return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
|
||||
}
|
||||
|
||||
|
||||
/* ConditionalVariable */
|
||||
struct rb_condvar {
|
||||
struct ccan_list_head waitq;
|
||||
rb_serial_t fork_gen;
|
||||
};
|
||||
|
||||
/*
|
||||
* Document-class: Thread::ConditionVariable
|
||||
*
|
||||
* ConditionVariable objects augment class Mutex. Using condition variables,
|
||||
* it is possible to suspend while in the middle of a critical section until a
|
||||
* condition is met, such as a resource becomes available.
|
||||
*
|
||||
* Due to non-deterministic scheduling and spurious wake-ups, users of
|
||||
* condition variables should always use a separate boolean predicate (such as
|
||||
* reading from a boolean variable) to check if the condition is actually met
|
||||
* before starting to wait, and should wait in a loop, re-checking the
|
||||
* condition every time the ConditionVariable is waken up. The idiomatic way
|
||||
* of using condition variables is calling the +wait+ method in an +until+
|
||||
* loop with the predicate as the loop condition.
|
||||
*
|
||||
* condvar.wait(mutex) until condition_is_met
|
||||
*
|
||||
* In the example below, we use the boolean variable +resource_available+
|
||||
* (which is protected by +mutex+) to indicate the availability of the
|
||||
* resource, and use +condvar+ to wait for that variable to become true. Note
|
||||
* that:
|
||||
*
|
||||
* 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so
|
||||
* fast that it have already made the resource available before either
|
||||
* +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if
|
||||
* +resource_available+ is already true before starting to wait.
|
||||
* 2. The +wait+ method may spuriously wake up without signalling. Therefore,
|
||||
* thread +a1+ and +a2+ should recheck +resource_available+ after the
|
||||
* +wait+ method returns, and go back to wait if the condition is not
|
||||
* actually met.
|
||||
* 3. It is possible that thread +a2+ starts right after thread +a1+ is waken
|
||||
* up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the
|
||||
* resource before thread +a1+ acquires the +mutex+. This necessitates
|
||||
* rechecking after +wait+, too.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* mutex = Thread::Mutex.new
|
||||
*
|
||||
* resource_available = false
|
||||
* condvar = Thread::ConditionVariable.new
|
||||
*
|
||||
* a1 = Thread.new {
|
||||
* # Thread 'a1' waits for the resource to become available and consumes
|
||||
* # the resource.
|
||||
* mutex.synchronize {
|
||||
* condvar.wait(mutex) until resource_available
|
||||
* # After the loop, 'resource_available' is guaranteed to be true.
|
||||
*
|
||||
* resource_available = false
|
||||
* puts "a1 consumed the resource"
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* a2 = Thread.new {
|
||||
* # Thread 'a2' behaves like 'a1'.
|
||||
* mutex.synchronize {
|
||||
* condvar.wait(mutex) until resource_available
|
||||
* resource_available = false
|
||||
* puts "a2 consumed the resource"
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* b = Thread.new {
|
||||
* # Thread 'b' periodically makes the resource available.
|
||||
* loop {
|
||||
* mutex.synchronize {
|
||||
* resource_available = true
|
||||
*
|
||||
* # Notify one waiting thread if any. It is possible that neither
|
||||
* # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK.
|
||||
* condvar.signal
|
||||
* }
|
||||
* sleep 1
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* # Eventually both 'a1' and 'a2' will have their resources, albeit in an
|
||||
* # unspecified order.
|
||||
* [a1, a2].each {|th| th.join}
|
||||
*/
|
||||
|
||||
static size_t
|
||||
condvar_memsize(const void *ptr)
|
||||
{
|
||||
@ -1679,75 +1253,24 @@ rb_condvar_broadcast(rb_execution_context_t *ec, VALUE self)
|
||||
return self;
|
||||
}
|
||||
|
||||
NORETURN(static VALUE undumpable(VALUE obj));
|
||||
/* :nodoc: */
|
||||
static VALUE
|
||||
undumpable(VALUE obj)
|
||||
{
|
||||
rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
|
||||
UNREACHABLE_RETURN(Qnil);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
define_thread_class(VALUE outer, const ID name, VALUE super)
|
||||
{
|
||||
VALUE klass = rb_define_class_id_under(outer, name, super);
|
||||
rb_const_set(rb_cObject, name, klass);
|
||||
return klass;
|
||||
}
|
||||
|
||||
static void
|
||||
Init_thread_sync(void)
|
||||
{
|
||||
#undef rb_intern
|
||||
#if defined(TEACH_RDOC) && TEACH_RDOC == 42
|
||||
rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
|
||||
rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
|
||||
rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
|
||||
rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
|
||||
#endif
|
||||
|
||||
#define DEFINE_CLASS(name, super) \
|
||||
rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
|
||||
|
||||
/* Mutex */
|
||||
DEFINE_CLASS(Mutex, Object);
|
||||
rb_cMutex = rb_define_class_id_under(rb_cThread, rb_intern("Mutex"), rb_cObject);
|
||||
rb_define_alloc_func(rb_cMutex, mutex_alloc);
|
||||
|
||||
/* Queue */
|
||||
DEFINE_CLASS(Queue, Object);
|
||||
VALUE rb_cQueue = rb_define_class_id_under_no_pin(rb_cThread, rb_intern("Queue"), rb_cObject);
|
||||
rb_define_alloc_func(rb_cQueue, queue_alloc);
|
||||
|
||||
rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
|
||||
|
||||
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
|
||||
rb_undef_method(rb_cQueue, "initialize_copy");
|
||||
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
|
||||
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
|
||||
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
||||
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
|
||||
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
|
||||
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
|
||||
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
|
||||
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
|
||||
rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0);
|
||||
|
||||
rb_define_alias(rb_cQueue, "enq", "push");
|
||||
rb_define_alias(rb_cQueue, "<<", "push");
|
||||
rb_define_alias(rb_cQueue, "size", "length");
|
||||
|
||||
DEFINE_CLASS(SizedQueue, Queue);
|
||||
VALUE rb_cSizedQueue = rb_define_class_id_under_no_pin(rb_cThread, rb_intern("SizedQueue"), rb_cQueue);
|
||||
rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
|
||||
|
||||
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
|
||||
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
|
||||
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
||||
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
||||
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
|
||||
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
|
||||
|
||||
/* CVar */
|
||||
DEFINE_CLASS(ConditionVariable, Object);
|
||||
VALUE rb_cConditionVariable = rb_define_class_id_under_no_pin(rb_cThread, rb_intern("ConditionVariable"), rb_cObject);
|
||||
rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
|
||||
|
||||
id_sleep = rb_intern("sleep");
|
||||
|
||||
344
thread_sync.rb
344
thread_sync.rb
@ -1,7 +1,62 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class Thread
|
||||
# The Thread::Queue class implements multi-producer, multi-consumer
|
||||
# queues. It is especially useful in threaded programming when
|
||||
# information must be exchanged safely between multiple threads. The
|
||||
# Thread::Queue class implements all the required locking semantics.
|
||||
#
|
||||
# The class implements FIFO (first in, first out) type of queue.
|
||||
# In a FIFO queue, the first tasks added are the first retrieved.
|
||||
#
|
||||
# Example:
|
||||
#
|
||||
# queue = Thread::Queue.new
|
||||
#
|
||||
# producer = Thread.new do
|
||||
# 5.times do |i|
|
||||
# sleep rand(i) # simulate expense
|
||||
# queue << i
|
||||
# puts "#{i} produced"
|
||||
# end
|
||||
# end
|
||||
#
|
||||
# consumer = Thread.new do
|
||||
# 5.times do |i|
|
||||
# value = queue.pop
|
||||
# sleep rand(i/2) # simulate expense
|
||||
# puts "consumed #{value}"
|
||||
# end
|
||||
# end
|
||||
#
|
||||
# consumer.join
|
||||
class Queue
|
||||
# Document-method: Queue::new
|
||||
#
|
||||
# call-seq:
|
||||
# Thread::Queue.new -> empty_queue
|
||||
# Thread::Queue.new(enumerable) -> queue
|
||||
#
|
||||
# Creates a new queue instance, optionally using the contents of an +enumerable+
|
||||
# for its initial state.
|
||||
#
|
||||
# Example:
|
||||
#
|
||||
# q = Thread::Queue.new
|
||||
# #=> #<Thread::Queue:0x00007ff7501110d0>
|
||||
# q.empty?
|
||||
# #=> true
|
||||
#
|
||||
# q = Thread::Queue.new([1, 2, 3])
|
||||
# #=> #<Thread::Queue:0x00007ff7500ec500>
|
||||
# q.empty?
|
||||
# #=> false
|
||||
# q.pop
|
||||
# #=> 1
|
||||
def initialize(enumerable = nil)
|
||||
Primitive.queue_initialize(enumerable)
|
||||
end
|
||||
|
||||
# call-seq:
|
||||
# pop(non_block=false, timeout: nil)
|
||||
#
|
||||
@ -21,9 +76,129 @@ class Thread
|
||||
end
|
||||
alias_method :deq, :pop
|
||||
alias_method :shift, :pop
|
||||
|
||||
undef_method :initialize_copy
|
||||
|
||||
# call-seq:
|
||||
# push(object)
|
||||
# enq(object)
|
||||
# <<(object)
|
||||
#
|
||||
# Pushes the given +object+ to the queue.
|
||||
def push(object)
|
||||
Primitive.cexpr!('queue_do_push(self, queue_ptr(self), object)')
|
||||
end
|
||||
alias_method :enq, :push
|
||||
alias_method :<<, :push
|
||||
|
||||
# call-seq:
|
||||
# close
|
||||
#
|
||||
# Closes the queue. A closed queue cannot be re-opened.
|
||||
#
|
||||
# After the call to close completes, the following are true:
|
||||
#
|
||||
# - +closed?+ will return true
|
||||
#
|
||||
# - +close+ will be ignored.
|
||||
#
|
||||
# - calling enq/push/<< will raise a +ClosedQueueError+.
|
||||
#
|
||||
# - when +empty?+ is false, calling deq/pop/shift will return an object
|
||||
# from the queue as usual.
|
||||
# - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
|
||||
# deq(true) will raise a +ThreadError+.
|
||||
#
|
||||
# ClosedQueueError is inherited from StopIteration, so that you can break loop block.
|
||||
#
|
||||
# Example:
|
||||
#
|
||||
# q = Thread::Queue.new
|
||||
# Thread.new{
|
||||
# while e = q.deq # wait for nil to break loop
|
||||
# # ...
|
||||
# end
|
||||
# }
|
||||
# q.close
|
||||
def close
|
||||
Primitive.cstmt! %{
|
||||
if (!queue_closed_p(self)) {
|
||||
FL_SET_RAW(self, QUEUE_CLOSED);
|
||||
|
||||
wakeup_all(&queue_ptr(self)->waitq);
|
||||
}
|
||||
|
||||
return self;
|
||||
}
|
||||
end
|
||||
|
||||
# call-seq: closed?
|
||||
#
|
||||
# Returns +true+ if the queue is closed.
|
||||
def closed?
|
||||
Primitive.cexpr!('RBOOL(FL_TEST_RAW(self, QUEUE_CLOSED))')
|
||||
end
|
||||
|
||||
# call-seq:
|
||||
# length
|
||||
# size
|
||||
#
|
||||
# Returns the length of the queue.
|
||||
def length
|
||||
Primitive.cexpr!('LONG2NUM(queue_ptr(self)->len)')
|
||||
end
|
||||
alias_method :size, :length
|
||||
|
||||
# call-seq: empty?
|
||||
#
|
||||
# Returns +true+ if the queue is empty.
|
||||
def empty?
|
||||
Primitive.cexpr!('RBOOL(queue_ptr(self)->len == 0)')
|
||||
end
|
||||
|
||||
# Removes all objects from the queue.
|
||||
def clear
|
||||
Primitive.cstmt! %{
|
||||
queue_clear(queue_ptr(self));
|
||||
return self;
|
||||
}
|
||||
end
|
||||
|
||||
# call-seq:
|
||||
# num_waiting
|
||||
#
|
||||
# Returns the number of threads waiting on the queue.
|
||||
def num_waiting
|
||||
Primitive.cexpr!('INT2NUM(queue_ptr(self)->num_waiting)')
|
||||
end
|
||||
|
||||
def marshal_dump # :nodoc:
|
||||
raise TypeError, "can't dump #{self.class}"
|
||||
end
|
||||
|
||||
# call-seq:
|
||||
# freeze
|
||||
#
|
||||
# The queue can't be frozen, so this method raises an exception:
|
||||
# Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
|
||||
def freeze
|
||||
raise TypeError, "cannot freeze #{self}"
|
||||
end
|
||||
end
|
||||
|
||||
class SizedQueue
|
||||
# This class represents queues of specified size capacity. The push operation
|
||||
# may be blocked if the capacity is full.
|
||||
#
|
||||
# See Thread::Queue for an example of how a Thread::SizedQueue works.
|
||||
class SizedQueue < Queue
|
||||
# Document-method: SizedQueue::new
|
||||
# call-seq: new(max)
|
||||
#
|
||||
# Creates a fixed-length queue with a maximum size of +max+.
|
||||
def initialize(vmax)
|
||||
Primitive.szqueue_initialize(vmax)
|
||||
end
|
||||
|
||||
# call-seq:
|
||||
# pop(non_block=false, timeout: nil)
|
||||
#
|
||||
@ -66,8 +241,93 @@ class Thread
|
||||
end
|
||||
alias_method :enq, :push
|
||||
alias_method :<<, :push
|
||||
|
||||
# call-seq:
|
||||
# close
|
||||
#
|
||||
# Similar to Thread::Queue#close.
|
||||
#
|
||||
# The difference is behavior with waiting enqueuing threads.
|
||||
#
|
||||
# If there are waiting enqueuing threads, they are interrupted by
|
||||
# raising ClosedQueueError('queue closed').
|
||||
def close
|
||||
Primitive.cstmt! %{
|
||||
if (!queue_closed_p(self)) {
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
|
||||
FL_SET(self, QUEUE_CLOSED);
|
||||
wakeup_all(szqueue_waitq(sq));
|
||||
wakeup_all(szqueue_pushq(sq));
|
||||
}
|
||||
return self;
|
||||
}
|
||||
end
|
||||
|
||||
# Removes all objects from the queue.
|
||||
def clear
|
||||
Primitive.cstmt! %{
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
queue_clear(&sq->q);
|
||||
wakeup_all(szqueue_pushq(sq));
|
||||
return self;
|
||||
}
|
||||
end
|
||||
|
||||
# Returns the number of threads waiting on the queue.
|
||||
def num_waiting
|
||||
Primitive.cstmt! %{
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
|
||||
}
|
||||
end
|
||||
|
||||
# Returns the maximum size of the queue.
|
||||
def max
|
||||
Primitive.cexpr!('LONG2NUM(szqueue_ptr(self)->max)')
|
||||
end
|
||||
|
||||
# call-seq: max=(number)
|
||||
#
|
||||
# Sets the maximum size of the queue to the given +number+.
|
||||
def max=(vmax)
|
||||
Primitive.cstmt! %{
|
||||
long max = NUM2LONG(vmax);
|
||||
if (max <= 0) {
|
||||
rb_raise(rb_eArgError, "queue size must be positive");
|
||||
}
|
||||
|
||||
long diff = 0;
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
|
||||
if (max > sq->max) {
|
||||
diff = max - sq->max;
|
||||
}
|
||||
sq->max = max;
|
||||
sync_wakeup(szqueue_pushq(sq), diff);
|
||||
return vmax;
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
# Thread::Mutex implements a simple semaphore that can be used to
|
||||
# coordinate access to shared data from multiple concurrent threads.
|
||||
#
|
||||
# Example:
|
||||
#
|
||||
# semaphore = Thread::Mutex.new
|
||||
#
|
||||
# a = Thread.new {
|
||||
# semaphore.synchronize {
|
||||
# # access shared resource
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# b = Thread.new {
|
||||
# semaphore.synchronize {
|
||||
# # access shared resource
|
||||
# }
|
||||
# }
|
||||
class Mutex
|
||||
# call-seq:
|
||||
# Thread::Mutex.new -> mutex
|
||||
@ -149,6 +409,83 @@ class Thread
|
||||
end
|
||||
end
|
||||
|
||||
# ConditionVariable objects augment class Mutex. Using condition variables,
|
||||
# it is possible to suspend while in the middle of a critical section until a
|
||||
# condition is met, such as a resource becomes available.
|
||||
#
|
||||
# Due to non-deterministic scheduling and spurious wake-ups, users of
|
||||
# condition variables should always use a separate boolean predicate (such as
|
||||
# reading from a boolean variable) to check if the condition is actually met
|
||||
# before starting to wait, and should wait in a loop, re-checking the
|
||||
# condition every time the ConditionVariable is waken up. The idiomatic way
|
||||
# of using condition variables is calling the +wait+ method in an +until+
|
||||
# loop with the predicate as the loop condition.
|
||||
#
|
||||
# condvar.wait(mutex) until condition_is_met
|
||||
#
|
||||
# In the example below, we use the boolean variable +resource_available+
|
||||
# (which is protected by +mutex+) to indicate the availability of the
|
||||
# resource, and use +condvar+ to wait for that variable to become true. Note
|
||||
# that:
|
||||
#
|
||||
# 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so
|
||||
# fast that it have already made the resource available before either
|
||||
# +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if
|
||||
# +resource_available+ is already true before starting to wait.
|
||||
# 2. The +wait+ method may spuriously wake up without signalling. Therefore,
|
||||
# thread +a1+ and +a2+ should recheck +resource_available+ after the
|
||||
# +wait+ method returns, and go back to wait if the condition is not
|
||||
# actually met.
|
||||
# 3. It is possible that thread +a2+ starts right after thread +a1+ is waken
|
||||
# up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the
|
||||
# resource before thread +a1+ acquires the +mutex+. This necessitates
|
||||
# rechecking after +wait+, too.
|
||||
#
|
||||
# Example:
|
||||
#
|
||||
# mutex = Thread::Mutex.new
|
||||
#
|
||||
# resource_available = false
|
||||
# condvar = Thread::ConditionVariable.new
|
||||
#
|
||||
# a1 = Thread.new {
|
||||
# # Thread 'a1' waits for the resource to become available and consumes
|
||||
# # the resource.
|
||||
# mutex.synchronize {
|
||||
# condvar.wait(mutex) until resource_available
|
||||
# # After the loop, 'resource_available' is guaranteed to be true.
|
||||
#
|
||||
# resource_available = false
|
||||
# puts "a1 consumed the resource"
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# a2 = Thread.new {
|
||||
# # Thread 'a2' behaves like 'a1'.
|
||||
# mutex.synchronize {
|
||||
# condvar.wait(mutex) until resource_available
|
||||
# resource_available = false
|
||||
# puts "a2 consumed the resource"
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# b = Thread.new {
|
||||
# # Thread 'b' periodically makes the resource available.
|
||||
# loop {
|
||||
# mutex.synchronize {
|
||||
# resource_available = true
|
||||
#
|
||||
# # Notify one waiting thread if any. It is possible that neither
|
||||
# # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK.
|
||||
# condvar.signal
|
||||
# }
|
||||
# sleep 1
|
||||
# }
|
||||
# }
|
||||
#
|
||||
# # Eventually both 'a1' and 'a2' will have their resources, albeit in an
|
||||
# # unspecified order.
|
||||
# [a1, a2].each {|th| th.join}
|
||||
class ConditionVariable
|
||||
# Document-method: ConditionVariable::new
|
||||
#
|
||||
@ -193,3 +530,8 @@ class Thread
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Mutex = Thread::Mutex
|
||||
ConditionVariable = Thread::ConditionVariable
|
||||
Queue = Thread::Queue
|
||||
SizedQueue = Thread::SizedQueue
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user