类 Fiber

Fiber 是 Ruby 中实现轻量级协作并发的一种基本机制。本质上,它们是一种创建代码块的方法,这些代码块可以像线程一样暂停和恢复。主要区别在于它们永远不会被抢占,并且调度必须由程序员完成,而不是由虚拟机完成。

与其他无栈轻量级并发模型不同,每个 Fiber 都带有一个栈。这使得 Fiber 可以从 Fiber 块内部深度嵌套的函数调用中暂停。请参阅 ruby(1) 手册页以配置 Fiber 栈的大小。

创建 Fiber 后,它不会自动运行。相反,必须使用 Fiber#resume 方法显式地要求它运行。Fiber 内部运行的代码可以通过调用 Fiber.yield 来放弃控制权,在这种情况下,它会将控制权交还给调用者(Fiber#resume 的调用者)。

在放弃或终止时,Fiber 会返回最后执行表达式的值。

例如

fiber = Fiber.new do
  Fiber.yield 1
  2
end

puts fiber.resume
puts fiber.resume
puts fiber.resume

产生

1
2
FiberError: dead fiber called

Fiber#resume 方法接受任意数量的参数,如果它是对 resume 的第一次调用,那么它们将作为块参数传递。否则,它们将是 Fiber.yield 调用的返回值。

示例

fiber = Fiber.new do |first|
  second = Fiber.yield first + 2
end

puts fiber.resume 10
puts fiber.resume 1_000_000
puts fiber.resume "The fiber will be dead before I can cause trouble"

产生

12
1000000
FiberError: dead fiber called

非阻塞 Fiber

非阻塞 Fiber 的概念是在 Ruby 3.0 中引入的。非阻塞 Fiber 在遇到通常会阻塞 Fiber 的操作(如 sleep 或等待另一个进程或 I/O)时,会将控制权交还给其他 Fiber,并允许调度器处理阻塞和唤醒(恢复)此 Fiber,以便它可以在可以继续时继续执行。

为了使 Fiber 具有非阻塞行为,它需要在 Fiber.new 中创建时设置 blocking: false(默认值),并且 Fiber.scheduler 应该使用 Fiber.set_scheduler 设置。如果当前线程没有设置 Fiber.scheduler,则阻塞和非阻塞纤维的行为相同。

Ruby 不提供调度器类:它应该由用户实现,并对应于 Fiber::Scheduler

还有一个 Fiber.schedule 方法,它应该立即以非阻塞方式执行给定的代码块。它的实际实现取决于调度器。

公共类方法

Fiber[key] → value click to toggle source

返回由 key 标识的纤维存储变量的值。

key 必须是符号,并且值由 Fiber#[]= 或 Fiber#store 设置。

另请参见 Fiber::[]=

static VALUE
rb_fiber_storage_aref(VALUE class, VALUE key)
{
    Check_Type(key, T_SYMBOL);

    VALUE storage = fiber_storage_get(fiber_current(), FALSE);
    if (storage == Qnil) return Qnil;

    return rb_hash_aref(storage, key);
}
Fiber[key] = value click to toggle source

value 分配给由 key 标识的纤维存储变量。如果变量不存在,则会创建它。

key 必须是 Symbol,否则会引发 TypeError

另请参见 Fiber::[]

static VALUE
rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value)
{
    Check_Type(key, T_SYMBOL);

    VALUE storage = fiber_storage_get(fiber_current(), value != Qnil);
    if (storage == Qnil) return Qnil;

    if (value == Qnil) {
        return rb_hash_delete(storage, key);
    }
    else {
        return rb_hash_aset(storage, key, value);
    }
}
blocking{|fiber| ...} → result click to toggle source

强制纤维在代码块持续时间内处于阻塞状态。返回代码块的结果。

有关详细信息,请参阅类文档中的“非阻塞纤维”部分。

VALUE
rb_fiber_blocking(VALUE class)
{
    VALUE fiber_value = rb_fiber_current();
    rb_fiber_t *fiber = fiber_ptr(fiber_value);

    // If we are already blocking, this is essentially a no-op:
    if (fiber->blocking) {
        return rb_yield(fiber_value);
    }
    else {
        return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value);
    }
}
blocking? → false or 1 click to toggle source

如果当前纤维是非阻塞的,则返回 false。如果 Fiber 是通过将 blocking: false 传递给 Fiber.new 或通过 Fiber.schedule 创建的,则它是非阻塞的。

如果当前 Fiber 处于阻塞状态,则该方法返回 1。未来的开发可能会允许返回更大的整数。

请注意,即使该方法返回 falseFiber 的行为也只有在当前线程中设置了 Fiber.scheduler 时才会不同。

有关详细信息,请参阅类文档中的“非阻塞纤维”部分。

static VALUE
rb_fiber_s_blocking_p(VALUE klass)
{
    rb_thread_t *thread = GET_THREAD();
    unsigned blocking = thread->blocking;

    if (blocking == 0)
        return Qfalse;

    return INT2NUM(blocking);
}
current → fiber click to toggle source

返回当前纤维。如果您不在纤维的上下文中运行,则此方法将返回根纤维。

static VALUE
rb_fiber_s_current(VALUE klass)
{
    return rb_fiber_current();
}
current_scheduler → obj or nil click to toggle source

如果当前线程是非阻塞的,则返回使用 FiberFiber.set_scheduler 为当前线程最后设置的 Fiber 调度器。

static VALUE
rb_fiber_current_scheduler(VALUE klass)
{
    return rb_fiber_scheduler_current();
}
new(blocking: false, storage: true) { |*args| ... } → fiber click to toggle source

创建一个新的 Fiber。最初,该线程未运行,可以使用 resume 恢复。传递给第一个 resume 调用的参数将传递给代码块。

f = Fiber.new do |initial|
   current = initial
   loop do
     puts "current: #{current.inspect}"
     current = Fiber.yield
   end
end
f.resume(100)     # prints: current: 100
f.resume(1, 2, 3) # prints: current: [1, 2, 3]
f.resume          # prints: current: nil
# ... and so on ...

如果将 blocking: false 传递给 Fiber.new并且当前线程定义了 Fiber.scheduler,则 Fiber 变成非阻塞的(请参阅类文档中的“非阻塞线程”部分)。

如果未指定 storage,则默认情况下会从当前线程继承存储的副本。这与指定 storage: true 相同。

Fiber[:x] = 1
Fiber.new do
  Fiber[:x] # => 1
  Fiber[:x] = 2
end.resume
Fiber[:x] # => 1

如果给定的 storagenil,则此函数将延迟初始化内部存储,该存储最初为空哈希。

Fiber[:x] = "Hello World"
Fiber.new(storage: nil) do
  Fiber[:x] # nil
end

否则,给定的 storage 将用作新线程的存储,并且它必须是 Hash 的实例。

显式使用 storage: true 目前处于实验阶段,将来可能会更改。

static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
    return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}
schedule { |*args| ... } → fiber click to toggle source

该方法预计会立即在单独的非阻塞线程中运行提供的代码块。

puts "Go to sleep!"

Fiber.set_scheduler(MyScheduler.new)

Fiber.schedule do
  puts "Going to sleep"
  sleep(1)
  puts "I slept well"
end

puts "Wakey-wakey, sleepyhead"

假设 MyScheduler 正确实现,该程序将生成

Go to sleep!
Going to sleep
Wakey-wakey, sleepyhead
...1 sec pause here...
I slept well

…例如,在 Fiber 内部的第一个阻塞操作(sleep(1))中,控制权将让位于外部代码(主线程),并且在该执行结束时,调度器将负责正确恢复所有阻塞的线程。

请注意,上面描述的行为是该方法预期的行为,实际行为取决于当前调度器对 Fiber::Scheduler#fiber 方法的实现。Ruby 不会强制该方法以任何特定方式执行。

如果未设置调度器,则该方法将引发 RuntimeError (No scheduler is available!)

static VALUE
rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj)
{
    return rb_fiber_s_schedule_kw(argc, argv, rb_keyword_given_p());
}
scheduler → obj 或 nil 点击切换源代码

返回当前线程最后一次使用 Fiber 的调度器 Fiber.set_scheduler 设置的调度器。如果未设置调度器(默认情况下),则返回 nil,并且非阻塞纤维的行为与阻塞纤维相同。(有关调度器概念的详细信息,请参阅类文档中的“非阻塞纤维”部分)。

static VALUE
rb_fiber_s_scheduler(VALUE klass)
{
    return rb_fiber_scheduler_get();
}
set_scheduler(scheduler) → scheduler 点击切换源代码

为当前线程设置 Fiber 调度器。如果设置了调度器,则非阻塞纤维(由 Fiber.new 使用 blocking: false 创建,或由 Fiber.schedule 创建)在可能阻塞的操作时调用该调度器的钩子方法,并且当前线程将在最终化时调用调度器的 close 方法(允许调度器正确管理所有未完成的纤维)。

scheduler 可以是任何与 Fiber::Scheduler 相对应的类的对象。它的实现由用户决定。

另请参阅类文档中的“非阻塞纤维”部分。

static VALUE
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
{
    return rb_fiber_scheduler_set(scheduler);
}
yield(args, ...) → obj 点击切换源代码

将控制权交回恢复纤维的上下文,并将传递给它的任何参数传递给它。当下次调用 resume 时,纤维将从此处继续处理。传递给下一个 resume 的任何参数将是此 Fiber.yield 表达式计算出的值。

static VALUE
rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass)
{
    return rb_fiber_yield_kw(argc, argv, rb_keyword_given_p());
}

公共实例方法

alive? → true 或 false 点击切换源代码

如果纤维仍然可以恢复(或转移),则返回 true。在完成纤维块的执行后,此方法将始终返回 false

VALUE
rb_fiber_alive_p(VALUE fiber_value)
{
    return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value)));
}
backtrace → array 点击切换源代码
backtrace(start) → array
backtrace(start, count) → array
backtrace(start..end) → array

返回纤维的当前执行堆栈。startcountend 允许仅选择回溯的一部分。

def level3
  Fiber.yield
end

def level2
  level3
end

def level1
  level2
end

f = Fiber.new { level1 }

# It is empty before the fiber started
f.backtrace
#=> []

f.resume

f.backtrace
#=> ["test.rb:2:in `yield'", "test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"]
p f.backtrace(1) # start from the item 1
#=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"]
p f.backtrace(2, 2) # start from item 2, take 2
#=> ["test.rb:6:in `level2'", "test.rb:10:in `level1'"]
p f.backtrace(1..3) # take items from 1 to 3
#=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'"]

f.resume

# It is nil after the fiber is finished
f.backtrace
#=> nil
static VALUE
rb_fiber_backtrace(int argc, VALUE *argv, VALUE fiber)
{
    return rb_vm_backtrace(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
backtrace_locations → array 点击切换源代码
backtrace_locations(start) → array
backtrace_locations(start, count) → array
backtrace_locations(start..end) → array

backtrace 相似,但将执行堆栈的每一行返回为 Thread::Backtrace::Location。接受与 backtrace 相同的参数。

f = Fiber.new { Fiber.yield }
f.resume
loc = f.backtrace_locations.first
loc.label  #=> "yield"
loc.path   #=> "test.rb"
loc.lineno #=> 1
static VALUE
rb_fiber_backtrace_locations(int argc, VALUE *argv, VALUE fiber)
{
    return rb_vm_backtrace_locations(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
blocking? → true 或 false 点击切换源代码

如果 fiber 处于阻塞状态,则返回 true,否则返回 false。如果 fiber 是通过传递 blocking: falseFiber.new 或通过 Fiber.schedule 创建的,则它是非阻塞的。

请注意,即使该方法返回 false,只有当当前线程中设置了 Fiber.scheduler 时,fiber 的行为才会有所不同。

有关详细信息,请参阅类文档中的“非阻塞纤维”部分。

VALUE
rb_fiber_blocking_p(VALUE fiber)
{
    return RBOOL(fiber_ptr(fiber)->blocking);
}
inspect()
别名:to_s
kill → nil 点击切换源代码

通过引发一个不可捕获的异常来终止 fiber。它只终止给定的 fiber,而不会终止其他 fiber,如果另一个 fiber 正在调用 resumetransfer,则返回 nil 给该 fiber。

Fiber#kill 只有在 fiber 处于 Fiber.yield 状态时才会中断另一个 fiber。如果在当前 fiber 上调用,则会在 Fiber#kill 调用点引发该异常。

如果 fiber 尚未启动,则直接过渡到终止状态。

如果 fiber 已经终止,则不执行任何操作。

如果在属于另一个线程的 fiber 上调用,则引发 FiberError

static VALUE
rb_fiber_m_kill(VALUE self)
{
    rb_fiber_t *fiber = fiber_ptr(self);

    if (fiber->killed) return Qfalse;
    fiber->killed = 1;

    if (fiber->status == FIBER_CREATED) {
        fiber->status = FIBER_TERMINATED;
    }
    else if (fiber->status != FIBER_TERMINATED) {
        if (fiber_current() == fiber) {
            fiber_check_killed(fiber);
        } else {
            fiber_raise(fiber_ptr(self), Qnil);
        }
    }

    return self;
}
raise → obj 点击切换源代码
raise(string) → obj
raise(exception [, string [, array]]) → obj

在 fiber 中上次调用 Fiber.yield 的位置引发异常。如果 fiber 尚未启动或已经运行完成,则引发 FiberError。如果 fiber 正在 yield,则恢复它。如果它正在转移,则转移到它。但是,如果它正在恢复,则引发 FiberError

如果没有参数,则引发 RuntimeError。如果只有一个 String 参数,则引发 RuntimeError,并将该字符串作为消息。否则,第一个参数应该是 Exception 类的名称(或在发送 exception 消息时返回 Exception 对象的对象)。可选的第二个参数设置与异常关联的消息,第三个参数是回调信息的数组。异常由 begin...end 块的 rescue 子句捕获。

如果在属于另一个 ThreadFiber 上调用,则引发 FiberError

static VALUE
rb_fiber_m_raise(int argc, VALUE *argv, VALUE self)
{
    return rb_fiber_raise(self, argc, argv);
}
resume(args, ...) → obj 点击切换源代码

从上次调用 Fiber.yield 的位置恢复纤程,或者如果这是第一次调用 resume 则开始运行它。传递给 resume 的参数将是 Fiber.yield 表达式的值,或者如果这是第一次调用 resume,则作为块参数传递给纤程的块。

或者,当调用 resume 时,它将评估为传递给纤程块中的下一个 Fiber.yield 语句的参数,或者如果它在没有任何 Fiber.yield 的情况下运行完成,则评估为块值。

static VALUE
rb_fiber_m_resume(int argc, VALUE *argv, VALUE fiber)
{
    return rb_fiber_resume_kw(fiber, argc, argv, rb_keyword_given_p());
}
storage → hash (dup) 点击切换源代码

返回纤程的存储哈希的副本。该方法只能在 Fiber.current 上调用。

static VALUE
rb_fiber_storage_get(VALUE self)
{
    storage_access_must_be_from_same_fiber(self);

    VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE);

    if (storage == Qnil) {
        return Qnil;
    }
    else {
        return rb_obj_dup(storage);
    }
}
storage = hash 点击切换源代码

设置纤程的存储哈希。此功能处于实验阶段,将来可能会更改。该方法只能在 Fiber.current 上调用。

使用此方法时要小心,因为您可能会无意中清除重要的纤程存储状态。您应该主要使用 Fiber::[]= 来分配存储中的特定键。

您也可以使用 Fiber.new(storage: nil) 创建一个具有空存储的纤程。

示例

while request = request_queue.pop
  # Reset the per-request state:
  Fiber.current.storage = nil
  handle_request(request)
end
static VALUE
rb_fiber_storage_set(VALUE self, VALUE value)
{
    if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) {
        rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL,
          "Fiber#storage= is experimental and may be removed in the future!");
    }

    storage_access_must_be_from_same_fiber(self);
    fiber_storage_validate(value);

    fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value);
    return value;
}
to_s() 点击切换源代码
static VALUE
fiber_to_s(VALUE fiber_value)
{
    const rb_fiber_t *fiber = fiber_ptr(fiber_value);
    const rb_proc_t *proc;
    char status_info[0x20];

    if (fiber->resuming_fiber) {
        snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status));
    }
    else {
        snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status));
    }

    if (!rb_obj_is_proc(fiber->first_proc)) {
        VALUE str = rb_any_to_s(fiber_value);
        strlcat(status_info, ">", sizeof(status_info));
        rb_str_set_len(str, RSTRING_LEN(str)-1);
        rb_str_cat_cstr(str, status_info);
        return str;
    }
    GetProcPtr(fiber->first_proc, proc);
    return rb_block_to_s(fiber_value, &proc->block, status_info);
}
也称为:inspect
transfer(args, ...) → obj 点击切换源代码

将控制权转移到另一个纤程,从它上次停止的位置恢复它,或者如果它之前没有恢复,则启动它。调用纤程将被挂起,就像在调用 Fiber.yield 时一样。

接收 transfer 调用的纤程将其视为 resume 调用。传递给 transfer 的参数被视为传递给 resume 的参数。

两种纤程控制传递方式(一种是 resumeFiber::yield,另一种是 transfer 到和从纤程)不能随意混合。

  • 如果 Fiber 的生命周期从 transfer 开始,它将永远无法 yield 或恢复控制传递,只能完成或转移回。 (它仍然可以恢复其他允许恢复的 Fiber。)

  • 如果 Fiber 的生命周期从 resume 开始,它可以 yield 或转移到另一个 Fiber,但只能以与它被赋予控制权的方式兼容的方式接收回控制权:如果它被转移,它只能被转移回来,如果它被 yield,它只能被恢复回来。 之后,它可以再次转移或 yield。

如果违反了这些规则,将引发 FiberError

对于单个 Fiber 设计,yield/resume 更易于使用(Fiber 只放弃控制权,它不需要考虑控制权被赋予谁),而 transfer 对于复杂情况更灵活,允许构建相互依赖的 Fiber 的任意图形。

示例

manager = nil # For local var to be visible inside worker block

# This fiber would be started with transfer
# It can't yield, and can't be resumed
worker = Fiber.new { |work|
  puts "Worker: starts"
  puts "Worker: Performed #{work.inspect}, transferring back"
  # Fiber.yield     # this would raise FiberError: attempt to yield on a not resumed fiber
  # manager.resume  # this would raise FiberError: attempt to resume a resumed fiber (double resume)
  manager.transfer(work.capitalize)
}

# This fiber would be started with resume
# It can yield or transfer, and can be transferred
# back or resumed
manager = Fiber.new {
  puts "Manager: starts"
  puts "Manager: transferring 'something' to worker"
  result = worker.transfer('something')
  puts "Manager: worker returned #{result.inspect}"
  # worker.resume    # this would raise FiberError: attempt to resume a transferring fiber
  Fiber.yield        # this is OK, the fiber transferred from and to, now it can yield
  puts "Manager: finished"
}

puts "Starting the manager"
manager.resume
puts "Resuming the manager"
# manager.transfer  # this would raise FiberError: attempt to transfer to a yielding fiber
manager.resume

产生

Starting the manager
Manager: starts
Manager: transferring 'something' to worker
Worker: starts
Worker: Performed "something", transferring back
Manager: worker returned "Something"
Resuming the manager
Manager: finished
static VALUE
rb_fiber_m_transfer(int argc, VALUE *argv, VALUE self)
{
    return rb_fiber_transfer_kw(self, argc, argv, rb_keyword_given_p());
}