class Thread::Queue

Thread::Queue 类实现了多生产者、多消费者队列。当需要在多个线程之间安全地交换信息时,它在线程编程中特别有用。Thread::Queue 类实现了所有必需的锁定语义。

此类实现了 FIFO (先进先出) 类型的队列。在 FIFO 队列中,先添加的任务先被检索。

示例

queue = Thread::Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

公共类方法

Thread::Queue.new → empty_queue 单击切换源代码
Thread::Queue.new(enumerable) → queue

创建一个新的队列实例,可以选择使用 enumerable 的内容作为其初始状态。

示例

q = Thread::Queue.new
#=> #<Thread::Queue:0x00007ff7501110d0>
q.empty?
#=> true

q = Thread::Queue.new([1, 2, 3])
#=> #<Thread::Queue:0x00007ff7500ec500>
q.empty?
#=> false
q.pop
#=> 1
static VALUE
rb_queue_initialize(int argc, VALUE *argv, VALUE self)
{
    VALUE initial;
    struct rb_queue *q = queue_ptr(self);
    if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
        initial = rb_to_array(initial);
    }
    RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
    ccan_list_head_init(queue_waitq(q));
    if (argc == 1) {
        rb_ary_concat(q->que, initial);
    }
    return self;
}

公共实例方法

<<(object)

将给定的 object 推送到队列。

别名为:push
clear() 单击切换源代码

从队列中移除所有对象。

static VALUE
rb_queue_clear(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    rb_ary_clear(check_array(self, q->que));
    return self;
}
close 单击切换源代码

关闭队列。关闭后的队列无法重新打开。

在调用 close 完成后,以下情况为真

  • closed? 将返回 true

  • close 将被忽略。

  • 调用 enq/push/<< 将引发 ClosedQueueError

  • empty? 为 false 时,调用 deq/pop/shift 将像往常一样从队列返回一个对象。

  • empty? 为 true 时,deq(false) 将不会挂起线程,并返回 nil。deq(true) 将引发 ThreadError

ClosedQueueError 继承自 StopIteration,因此你可以中断循环块。

示例

q = Thread::Queue.new
Thread.new{
  while e = q.deq # wait for nil to break loop
    # ...
  end
}
q.close
static VALUE
rb_queue_close(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    if (!queue_closed_p(self)) {
        FL_SET(self, QUEUE_CLOSED);

        wakeup_all(queue_waitq(q));
    }

    return self;
}
closed? 单击切换源代码

如果队列已关闭,则返回 true

static VALUE
rb_queue_closed_p(VALUE self)
{
    return RBOOL(queue_closed_p(self));
}
deq(non_block = false, timeout: nil)
别名为:pop
empty? 单击切换源代码

如果队列为空,则返回 true

static VALUE
rb_queue_empty_p(VALUE self)
{
    return RBOOL(queue_length(self, queue_ptr(self)) == 0);
}
enq(object)

将给定的 object 推送到队列。

别名为:push
freeze 单击切换源代码

队列无法被冻结,因此此方法会引发异常

Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
static VALUE
rb_queue_freeze(VALUE self)
{
    rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
    UNREACHABLE_RETURN(self);
}
length 单击切换源代码

返回队列的长度。

static VALUE
rb_queue_length(VALUE self)
{
    return LONG2NUM(queue_length(self, queue_ptr(self)));
}
也别名为:size
num_waiting() 单击切换源代码

返回等待队列的线程数。

static VALUE
rb_queue_num_waiting(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    return INT2NUM(q->num_waiting);
}
pop(non_block=false, timeout: nil) 单击切换源代码

从队列中检索数据。

如果队列为空,则调用线程将挂起,直到有数据被推送到队列中。如果 non_block 为 true,则线程不会挂起,并且会引发 ThreadError

如果已过去 timeout 秒且没有可用的数据,则返回 nil。如果 timeout0,则立即返回。

# File ruby_3_4_1/thread_sync.rb, line 14
def pop(non_block = false, timeout: nil)
  if non_block && timeout
    raise ArgumentError, "can't set a timeout if non_block is enabled"
  end
  Primitive.rb_queue_pop(non_block, timeout)
end
也别名为:deqshift
push(object) 单击切换源代码

将给定的 object 推送到队列。

static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
    return queue_do_push(self, queue_ptr(self), obj);
}
也别名为:enq, <<
shift(non_block = false, timeout: nil)
别名为:pop
size

返回队列的长度。

别名为:length