class Fiber

Fiber 是在 Ruby 中实现轻量级协同并发的原语。基本上,它们是一种创建可以暂停和恢复的代码块的方法,很像线程。主要的区别在于它们永远不会被抢占,并且调度必须由程序员而不是虚拟机来完成。

与其他无栈轻量级并发模型相反,每个 fiber 都带有一个堆栈。这使得 fiber 可以从 fiber 块内的深层嵌套函数调用中暂停。请参阅 ruby(1) 手册页以配置 fiber 堆栈的大小。

当创建一个 fiber 时,它不会自动运行。相反,必须使用 Fiber#resume 方法显式地要求其运行。在 fiber 内部运行的代码可以通过调用 Fiber.yield 来放弃控制权,在这种情况下,它会将控制权返回给调用者(Fiber#resume 的调用者)。

在 yield 或终止时,Fiber 返回最后执行的表达式的值。

例如

fiber = Fiber.new do
  Fiber.yield 1
  2
end

puts fiber.resume
puts fiber.resume
puts fiber.resume

会产生

1
2
FiberError: dead fiber called

Fiber#resume 方法接受任意数量的参数,如果是第一次调用 resume,它们将作为块参数传递。否则,它们将是调用 Fiber.yield 的返回值。

示例

fiber = Fiber.new do |first|
  second = Fiber.yield first + 2
end

puts fiber.resume 10
puts fiber.resume 1_000_000
puts fiber.resume "The fiber will be dead before I can cause trouble"

会产生

12
1000000
FiberError: dead fiber called

非阻塞 Fiber

非阻塞 fiber 的概念是在 Ruby 3.0 中引入的。当非阻塞 fiber 遇到通常会阻塞 fiber 的操作(例如 sleep,或等待另一个进程或 I/O)时,它会将控制权交给其他 fiber,并允许调度器处理阻塞和唤醒(恢复)此 fiber,使其可以继续执行。

为了使 Fiber 表现为非阻塞,需要在 Fiber.new 中使用 blocking: false(这是默认值)创建它,并且应该使用 Fiber.set_scheduler 设置 Fiber.scheduler。如果在当前线程中未设置 Fiber.scheduler,阻塞和非阻塞 fiber 的行为是相同的。

Ruby 不提供调度器类:它应该由用户实现,并对应于 Fiber::Scheduler

还有一个 Fiber.schedule 方法,它应该立即以非阻塞方式执行给定的块。它的实际实现取决于调度器。

公共类方法

Fiber[key] → value 点击切换源代码

返回由 key 标识的 fiber 存储变量的值。

key 必须是一个符号,该值由 Fiber#[]= 或 Fiber#store 设置。

另请参阅 Fiber::[]=

static VALUE
rb_fiber_storage_aref(VALUE class, VALUE key)
{
    key = rb_to_symbol(key);

    VALUE storage = fiber_storage_get(fiber_current(), FALSE);
    if (storage == Qnil) return Qnil;

    return rb_hash_aref(storage, key);
}
Fiber[key] = value 点击切换源代码

value 赋值给由 key 标识的 fiber 存储变量。如果该变量不存在,则会创建它。

key 必须是一个 Symbol,否则会引发 TypeError

另请参阅 Fiber::[]

static VALUE
rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value)
{
    key = rb_to_symbol(key);

    VALUE storage = fiber_storage_get(fiber_current(), value != Qnil);
    if (storage == Qnil) return Qnil;

    if (value == Qnil) {
        return rb_hash_delete(storage, key);
    }
    else {
        return rb_hash_aset(storage, key, value);
    }
}
blocking{|fiber| ...} → result 点击切换源代码

强制 fiber 在块的持续时间内为阻塞状态。返回块的结果。

有关详细信息,请参阅类文档中的“非阻塞 fiber”部分。

VALUE
rb_fiber_blocking(VALUE class)
{
    VALUE fiber_value = rb_fiber_current();
    rb_fiber_t *fiber = fiber_ptr(fiber_value);

    // If we are already blocking, this is essentially a no-op:
    if (fiber->blocking) {
        return rb_yield(fiber_value);
    }
    else {
        return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value);
    }
}
blocking? → false 或 1 点击切换源代码

如果当前 fiber 是非阻塞的,则返回 falseFiber 如果是通过将 blocking: false 传递给 Fiber.new 创建的,或者是通过 Fiber.schedule 创建的,则为非阻塞的。

如果当前的 Fiber 是阻塞的,该方法返回 1。未来的发展可能会允许返回更大的整数的情况。

请注意,即使该方法返回 falseFiber 的行为也仅在当前线程中设置了 Fiber.scheduler 时才会有所不同。

有关详细信息,请参阅类文档中的“非阻塞 fiber”部分。

static VALUE
rb_fiber_s_blocking_p(VALUE klass)
{
    rb_thread_t *thread = GET_THREAD();
    unsigned blocking = thread->blocking;

    if (blocking == 0)
        return Qfalse;

    return INT2NUM(blocking);
}
current → fiber 点击切换源代码

返回当前的 fiber。如果您不在 fiber 的上下文中运行,此方法将返回根 fiber。

static VALUE
rb_fiber_s_current(VALUE klass)
{
    return rb_fiber_current();
}
current_scheduler → obj 或 nil 点击切换源代码

返回 Fiber 调度器,该调度器是最后使用 Fiber.set_scheduler 为当前线程设置的,当且仅当当前 fiber 是非阻塞的。

static VALUE
rb_fiber_current_scheduler(VALUE klass)
{
    return rb_fiber_scheduler_current();
}
new(blocking: false, storage: true) { |*args| ... } → fiber 点击切换源代码

创建新的 Fiber。最初,fiber 不运行,可以使用 resume 恢复。传递给第一个 resume 调用的参数将传递给块。

f = Fiber.new do |initial|
   current = initial
   loop do
     puts "current: #{current.inspect}"
     current = Fiber.yield
   end
end
f.resume(100)     # prints: current: 100
f.resume(1, 2, 3) # prints: current: [1, 2, 3]
f.resume          # prints: current: nil
# ... and so on ...

如果将 blocking: false 传递给 Fiber.new并且当前线程定义了 Fiber.scheduler,则 Fiber 变为非阻塞的(请参阅类文档中的“非阻塞 Fiber”部分)。

如果未指定 storage,则默认是从当前 fiber 继承存储的副本。这与指定 storage: true 相同。

Fiber[:x] = 1
Fiber.new do
  Fiber[:x] # => 1
  Fiber[:x] = 2
end.resume
Fiber[:x] # => 1

如果给定的 storagenil,此函数将延迟初始化内部存储,该存储最初为空哈希。

Fiber[:x] = "Hello World"
Fiber.new(storage: nil) do
  Fiber[:x] # nil
end

否则,给定的 storage 将用作新 fiber 的存储,它必须是 Hash 的实例。

显式使用 storage: true 目前是实验性的,将来可能会发生变化。

static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
    return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}
schedule { |*args| ... } → fiber 点击切换源代码

该方法应该立即在单独的非阻塞 fiber 中运行提供的代码块。

puts "Go to sleep!"

Fiber.set_scheduler(MyScheduler.new)

Fiber.schedule do
  puts "Going to sleep"
  sleep(1)
  puts "I slept well"
end

puts "Wakey-wakey, sleepyhead"

假设 MyScheduler 已正确实现,此程序将产生

Go to sleep!
Going to sleep
Wakey-wakey, sleepyhead
...1 sec pause here...
I slept well

……例如,在 Fiber 内部的第一个阻塞操作 (sleep(1)) 上,控制权将让给外部代码(主 fiber),并且在该执行结束时,调度程序将负责正确地恢复所有阻塞的 fiber。

请注意,上述行为是该方法应该表现的方式,实际行为取决于当前调度程序对 Fiber::Scheduler#fiber 方法的实现。Ruby 不强制此方法以任何特定方式执行。

如果未设置调度程序,该方法会引发 RuntimeError (No scheduler is available!)

static VALUE
rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj)
{
    return rb_fiber_s_schedule_kw(argc, argv, rb_keyword_given_p());
}
scheduler → obj 或 nil 点击切换源代码

返回 Fiber 调度器,该调度器是最后使用 Fiber.set_scheduler 为当前线程设置的。如果未设置调度程序(这是默认值),则返回 nil,非阻塞 fiber 的行为与阻塞相同。(有关调度程序概念的详细信息,请参阅类文档中的“非阻塞 fiber”部分)。

static VALUE
rb_fiber_s_scheduler(VALUE klass)
{
    return rb_fiber_scheduler_get();
}
set_scheduler(scheduler) → scheduler 点击切换源代码

为当前线程设置 Fiber 调度器。如果设置了调度程序,非阻塞 fiber(通过 Fiber.new 创建,其中 blocking: false 或通过 Fiber.schedule 创建)会在潜在的阻塞操作上调用该调度程序的钩子方法,并且当前线程将在最终化时调用调度程序的 close 方法(允许调度程序正确管理所有未完成的 fiber)。

scheduler 可以是对应于 Fiber::Scheduler 的任何类的对象。它的实现取决于用户。

另请参阅类文档中的“非阻塞 fiber”部分。

static VALUE
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
{
    return rb_fiber_scheduler_set(scheduler);
}
yield(args, ...) → obj 点击切换源代码

将控制权交回恢复 fiber 的上下文,并传递传递给它的任何参数。当接下来调用 resume 时,fiber 将在此处继续处理。传递给下一个 resume 的任何参数都将是此 Fiber.yield 表达式评估为的值。

static VALUE
rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass)
{
    return rb_fiber_yield_kw(argc, argv, rb_keyword_given_p());
}

公共实例方法

alive? → true 或 false 点击切换源代码

如果 fiber 仍然可以恢复(或转移到),则返回 true。在完成 fiber 块的执行后,此方法将始终返回 false

VALUE
rb_fiber_alive_p(VALUE fiber_value)
{
    return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value)));
}
backtrace → array 点击切换源代码
backtrace(start) → array
backtrace(start, count) → array
backtrace(start..end) → array

返回 fiber 的当前执行堆栈。startcountend 允许仅选择回溯的部分。

def level3
  Fiber.yield
end

def level2
  level3
end

def level1
  level2
end

f = Fiber.new { level1 }

# It is empty before the fiber started
f.backtrace
#=> []

f.resume

f.backtrace
#=> ["test.rb:2:in `yield'", "test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"]
p f.backtrace(1) # start from the item 1
#=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"]
p f.backtrace(2, 2) # start from item 2, take 2
#=> ["test.rb:6:in `level2'", "test.rb:10:in `level1'"]
p f.backtrace(1..3) # take items from 1 to 3
#=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'"]

f.resume

# It is nil after the fiber is finished
f.backtrace
#=> nil
static VALUE
rb_fiber_backtrace(int argc, VALUE *argv, VALUE fiber)
{
    return rb_vm_backtrace(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
backtrace_locations → array 点击切换源代码
backtrace_locations(start) → array
backtrace_locations(start, count) → array
backtrace_locations(start..end) → array

类似于 backtrace,但将执行堆栈的每一行作为 Thread::Backtrace::Location 返回。接受与 backtrace 相同的参数。

f = Fiber.new { Fiber.yield }
f.resume
loc = f.backtrace_locations.first
loc.label  #=> "yield"
loc.path   #=> "test.rb"
loc.lineno #=> 1
static VALUE
rb_fiber_backtrace_locations(int argc, VALUE *argv, VALUE fiber)
{
    return rb_vm_backtrace_locations(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
blocking? → true 或 false 点击切换源代码

如果 fiber 处于阻塞状态,则返回 true,否则返回 false。如果 Fiber 是通过向 Fiber.new 传递 blocking: false 创建的,或者是通过 Fiber.schedule 创建的,则该 Fiber 为非阻塞状态。

请注意,即使该方法返回 false,只有当当前线程中设置了 Fiber.scheduler 时,该 Fiber 的行为才会不同。

有关详细信息,请参阅类文档中的“非阻塞 fiber”部分。

VALUE
rb_fiber_blocking_p(VALUE fiber)
{
    return RBOOL(fiber_ptr(fiber)->blocking);
}
inspect()
别名: to_s
kill → nil 点击切换源代码

通过引发不可捕获的异常来终止 Fiber。它只会终止给定的 Fiber,而不会终止其他 Fiber。如果该 Fiber 正在调用 resumetransfer,则向另一个 Fiber 返回 nil

只有当另一个 Fiber 处于 Fiber.yield 状态时,Fiber#kill 才会中断它。如果在当前 Fiber 上调用此方法,则会在 Fiber#kill 调用点引发异常。

如果 Fiber 尚未启动,则直接转换为终止状态。

如果 Fiber 已终止,则不执行任何操作。

如果对属于另一个线程的 Fiber 调用此方法,则引发 FiberError

static VALUE
rb_fiber_m_kill(VALUE self)
{
    rb_fiber_t *fiber = fiber_ptr(self);

    if (fiber->killed) return Qfalse;
    fiber->killed = 1;

    if (fiber->status == FIBER_CREATED) {
        fiber->status = FIBER_TERMINATED;
    }
    else if (fiber->status != FIBER_TERMINATED) {
        if (fiber_current() == fiber) {
            fiber_check_killed(fiber);
        }
        else {
            fiber_raise(fiber_ptr(self), Qnil);
        }
    }

    return self;
}
raise → obj 点击切换源代码
raise(string) → obj
raise(exception [, string [, array]]) → obj

在最后一次调用 Fiber.yield 的位置,在 Fiber 中引发异常。如果 Fiber 尚未启动或已经运行完成,则引发 FiberError。如果 Fiber 正在 yield,则恢复执行。如果正在 transfer,则将其 transfer 到。但是,如果正在 resuming,则引发 FiberError

如果没有参数,则引发 RuntimeError。如果只有一个 String 参数,则引发带有该字符串作为消息的 RuntimeError。否则,第一个参数应该是 Exception 类的名称(或者在发送 exception 消息时返回 Exception 对象的对象)。可选的第二个参数设置与异常关联的消息,第三个参数是回调信息数组。异常会被 begin...end 代码块的 rescue 子句捕获。

如果对属于另一个 ThreadFiber 调用此方法,则引发 FiberError

有关更多信息,请参阅 Kernel#raise

static VALUE
rb_fiber_m_raise(int argc, VALUE *argv, VALUE self)
{
    return rb_fiber_raise(self, argc, argv);
}
resume(args, ...) → obj 点击切换源代码

从最后一次调用 Fiber.yield 的位置恢复执行 Fiber,如果这是第一次调用 resume,则开始运行它。传递给 resume 的参数将是 Fiber.yield 表达式的值,或者如果这是第一次 resume,则作为块参数传递给 Fiber 的块。

或者,当调用 resume 时,它会计算为传递给 Fiber 块中下一个 Fiber.yield 语句的参数,或者如果它在没有任何 Fiber.yield 的情况下运行完成,则计算为块的值。

static VALUE
rb_fiber_m_resume(int argc, VALUE *argv, VALUE fiber)
{
    return rb_fiber_resume_kw(fiber, argc, argv, rb_keyword_given_p());
}
storage → hash (dup) 点击切换源代码

返回 Fiber 存储哈希的副本。该方法只能在 Fiber.current 上调用。

static VALUE
rb_fiber_storage_get(VALUE self)
{
    storage_access_must_be_from_same_fiber(self);

    VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE);

    if (storage == Qnil) {
        return Qnil;
    }
    else {
        return rb_obj_dup(storage);
    }
}
storage = hash 点击切换源代码

设置 Fiber 的存储哈希。此功能是实验性的,将来可能会发生变化。该方法只能在 Fiber.current 上调用。

您应该谨慎使用此方法,因为您可能会在不经意间清除重要的 Fiber 存储状态。您应该主要倾向于使用 Fiber::[]= 在存储中分配特定的键。

您还可以使用 Fiber.new(storage: nil) 创建一个具有空存储的 Fiber。

示例

while request = request_queue.pop
  # Reset the per-request state:
  Fiber.current.storage = nil
  handle_request(request)
end
static VALUE
rb_fiber_storage_set(VALUE self, VALUE value)
{
    if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) {
        rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL,
          "Fiber#storage= is experimental and may be removed in the future!");
    }

    storage_access_must_be_from_same_fiber(self);
    fiber_storage_validate(value);

    fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value);
    return value;
}
to_s() 点击切换源代码
static VALUE
fiber_to_s(VALUE fiber_value)
{
    const rb_fiber_t *fiber = fiber_ptr(fiber_value);
    const rb_proc_t *proc;
    char status_info[0x20];

    if (fiber->resuming_fiber) {
        snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status));
    }
    else {
        snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status));
    }

    if (!rb_obj_is_proc(fiber->first_proc)) {
        VALUE str = rb_any_to_s(fiber_value);
        strlcat(status_info, ">", sizeof(status_info));
        rb_str_set_len(str, RSTRING_LEN(str)-1);
        rb_str_cat_cstr(str, status_info);
        return str;
    }
    GetProcPtr(fiber->first_proc, proc);
    return rb_block_to_s(fiber_value, &proc->block, status_info);
}
别名: inspect
transfer(args, ...) → obj 点击切换源代码

将控制权转移到另一个 Fiber,从上次停止的位置恢复执行,如果之前未恢复执行,则启动它。调用 Fiber 将被暂停,就像调用 Fiber.yield 一样。

接收 transfer 调用的 Fiber 会将其视为 resume 调用。传递给 transfer 的参数会被视为传递给 resume 的参数。

在 Fiber 之间传递控制的两种方式(一种是 resumeFiber::yield,另一种是 transfer 在 Fiber 之间传递)不能随意混合。

  • 如果 Fiber 的生命周期是以 transfer 开始的,它将永远无法 yield 或恢复控制传递,只能完成或 transfer 回去。(它仍然可以恢复其他允许被恢复的 Fiber。)

  • 如果 Fiber 的生命周期是以 resume 开始的,它可以 yield 或 transfer 到另一个 Fiber,但只能以与它被放弃的方式兼容的方式接收回控制:如果它 transfer 了,则只能被 transfer 回去,如果它 yield 了,则只能被 resume 回去。之后,它可以再次 transfer 或 yield。

如果违反了这些规则,则会引发 FiberError

对于单独的 Fiber 设计,yield/resume 更易于使用(Fiber 只是放弃了控制权,它不需要考虑控制权被给予谁),而 transfer 对于复杂的情况更灵活,允许构建彼此依赖的任意 Fiber 图。

示例

manager = nil # For local var to be visible inside worker block

# This fiber would be started with transfer
# It can't yield, and can't be resumed
worker = Fiber.new { |work|
  puts "Worker: starts"
  puts "Worker: Performed #{work.inspect}, transferring back"
  # Fiber.yield     # this would raise FiberError: attempt to yield on a not resumed fiber
  # manager.resume  # this would raise FiberError: attempt to resume a resumed fiber (double resume)
  manager.transfer(work.capitalize)
}

# This fiber would be started with resume
# It can yield or transfer, and can be transferred
# back or resumed
manager = Fiber.new {
  puts "Manager: starts"
  puts "Manager: transferring 'something' to worker"
  result = worker.transfer('something')
  puts "Manager: worker returned #{result.inspect}"
  # worker.resume    # this would raise FiberError: attempt to resume a transferring fiber
  Fiber.yield        # this is OK, the fiber transferred from and to, now it can yield
  puts "Manager: finished"
}

puts "Starting the manager"
manager.resume
puts "Resuming the manager"
# manager.transfer  # this would raise FiberError: attempt to transfer to a yielding fiber
manager.resume

会产生

Starting the manager
Manager: starts
Manager: transferring 'something' to worker
Worker: starts
Worker: Performed "something", transferring back
Manager: worker returned "Something"
Resuming the manager
Manager: finished
static VALUE
rb_fiber_m_transfer(int argc, VALUE *argv, VALUE self)
{
    return rb_fiber_transfer_kw(self, argc, argv, rb_keyword_given_p());
}