类 Fiber
Fiber 是 Ruby 中实现轻量级协作并发的一种基本机制。本质上,它们是一种创建代码块的方法,这些代码块可以像线程一样暂停和恢复。主要区别在于它们永远不会被抢占,并且调度必须由程序员完成,而不是由虚拟机完成。
与其他无栈轻量级并发模型不同,每个 Fiber 都带有一个栈。这使得 Fiber 可以从 Fiber 块内部深度嵌套的函数调用中暂停。请参阅 ruby(1) 手册页以配置 Fiber 栈的大小。
创建 Fiber 后,它不会自动运行。相反,必须使用 Fiber#resume
方法显式地要求它运行。Fiber 内部运行的代码可以通过调用 Fiber.yield
来放弃控制权,在这种情况下,它会将控制权交还给调用者(Fiber#resume
的调用者)。
在放弃或终止时,Fiber
会返回最后执行表达式的值。
例如
fiber = Fiber.new do Fiber.yield 1 2 end puts fiber.resume puts fiber.resume puts fiber.resume
产生
1 2 FiberError: dead fiber called
Fiber#resume
方法接受任意数量的参数,如果它是对 resume
的第一次调用,那么它们将作为块参数传递。否则,它们将是 Fiber.yield
调用的返回值。
示例
fiber = Fiber.new do |first| second = Fiber.yield first + 2 end puts fiber.resume 10 puts fiber.resume 1_000_000 puts fiber.resume "The fiber will be dead before I can cause trouble"
产生
12 1000000 FiberError: dead fiber called
非阻塞 Fiber¶ ↑
非阻塞 Fiber 的概念是在 Ruby 3.0 中引入的。非阻塞 Fiber 在遇到通常会阻塞 Fiber 的操作(如 sleep
或等待另一个进程或 I/O)时,会将控制权交还给其他 Fiber,并允许调度器处理阻塞和唤醒(恢复)此 Fiber,以便它可以在可以继续时继续执行。
为了使 Fiber
具有非阻塞行为,它需要在 Fiber.new
中创建时设置 blocking: false
(默认值),并且 Fiber.scheduler
应该使用 Fiber.set_scheduler
设置。如果当前线程没有设置 Fiber.scheduler
,则阻塞和非阻塞纤维的行为相同。
Ruby 不提供调度器类:它应该由用户实现,并对应于 Fiber::Scheduler
。
还有一个 Fiber.schedule
方法,它应该立即以非阻塞方式执行给定的代码块。它的实际实现取决于调度器。
公共类方法
返回由 key
标识的纤维存储变量的值。
key
必须是符号,并且值由 Fiber#[]= 或 Fiber#store 设置。
另请参见 Fiber::[]=
。
static VALUE rb_fiber_storage_aref(VALUE class, VALUE key) { Check_Type(key, T_SYMBOL); VALUE storage = fiber_storage_get(fiber_current(), FALSE); if (storage == Qnil) return Qnil; return rb_hash_aref(storage, key); }
将 value
分配给由 key
标识的纤维存储变量。如果变量不存在,则会创建它。
key
必须是 Symbol
,否则会引发 TypeError
。
另请参见 Fiber::[]
。
static VALUE rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value) { Check_Type(key, T_SYMBOL); VALUE storage = fiber_storage_get(fiber_current(), value != Qnil); if (storage == Qnil) return Qnil; if (value == Qnil) { return rb_hash_delete(storage, key); } else { return rb_hash_aset(storage, key, value); } }
强制纤维在代码块持续时间内处于阻塞状态。返回代码块的结果。
有关详细信息,请参阅类文档中的“非阻塞纤维”部分。
VALUE rb_fiber_blocking(VALUE class) { VALUE fiber_value = rb_fiber_current(); rb_fiber_t *fiber = fiber_ptr(fiber_value); // If we are already blocking, this is essentially a no-op: if (fiber->blocking) { return rb_yield(fiber_value); } else { return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value); } }
如果当前纤维是非阻塞的,则返回 false
。如果 Fiber
是通过将 blocking: false
传递给 Fiber.new
或通过 Fiber.schedule
创建的,则它是非阻塞的。
如果当前 Fiber
处于阻塞状态,则该方法返回 1。未来的开发可能会允许返回更大的整数。
请注意,即使该方法返回 false
,Fiber
的行为也只有在当前线程中设置了 Fiber.scheduler
时才会不同。
有关详细信息,请参阅类文档中的“非阻塞纤维”部分。
static VALUE rb_fiber_s_blocking_p(VALUE klass) { rb_thread_t *thread = GET_THREAD(); unsigned blocking = thread->blocking; if (blocking == 0) return Qfalse; return INT2NUM(blocking); }
返回当前纤维。如果您不在纤维的上下文中运行,则此方法将返回根纤维。
static VALUE rb_fiber_s_current(VALUE klass) { return rb_fiber_current(); }
如果当前线程是非阻塞的,则返回使用 Fiber
的 Fiber.set_scheduler
为当前线程最后设置的 Fiber
调度器。
static VALUE rb_fiber_current_scheduler(VALUE klass) { return rb_fiber_scheduler_current(); }
创建一个新的 Fiber
。最初,该线程未运行,可以使用 resume
恢复。传递给第一个 resume
调用的参数将传递给代码块。
f = Fiber.new do |initial| current = initial loop do puts "current: #{current.inspect}" current = Fiber.yield end end f.resume(100) # prints: current: 100 f.resume(1, 2, 3) # prints: current: [1, 2, 3] f.resume # prints: current: nil # ... and so on ...
如果将 blocking: false
传递给 Fiber.new
,并且当前线程定义了 Fiber.scheduler
,则 Fiber
变成非阻塞的(请参阅类文档中的“非阻塞线程”部分)。
如果未指定 storage
,则默认情况下会从当前线程继承存储的副本。这与指定 storage: true
相同。
Fiber[:x] = 1 Fiber.new do Fiber[:x] # => 1 Fiber[:x] = 2 end.resume Fiber[:x] # => 1
如果给定的 storage
为 nil
,则此函数将延迟初始化内部存储,该存储最初为空哈希。
Fiber[:x] = "Hello World" Fiber.new(storage: nil) do Fiber[:x] # nil end
否则,给定的 storage
将用作新线程的存储,并且它必须是 Hash
的实例。
显式使用 storage: true
目前处于实验阶段,将来可能会更改。
static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) { return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p()); }
该方法预计会立即在单独的非阻塞线程中运行提供的代码块。
puts "Go to sleep!" Fiber.set_scheduler(MyScheduler.new) Fiber.schedule do puts "Going to sleep" sleep(1) puts "I slept well" end puts "Wakey-wakey, sleepyhead"
假设 MyScheduler 正确实现,该程序将生成
Go to sleep! Going to sleep Wakey-wakey, sleepyhead ...1 sec pause here... I slept well
…例如,在 Fiber
内部的第一个阻塞操作(sleep(1)
)中,控制权将让位于外部代码(主线程),并且在该执行结束时,调度器将负责正确恢复所有阻塞的线程。
请注意,上面描述的行为是该方法预期的行为,实际行为取决于当前调度器对 Fiber::Scheduler#fiber
方法的实现。Ruby 不会强制该方法以任何特定方式执行。
如果未设置调度器,则该方法将引发 RuntimeError (No scheduler is available!)
。
static VALUE rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj) { return rb_fiber_s_schedule_kw(argc, argv, rb_keyword_given_p()); }
返回当前线程最后一次使用 Fiber
的调度器 Fiber.set_scheduler
设置的调度器。如果未设置调度器(默认情况下),则返回 nil
,并且非阻塞纤维的行为与阻塞纤维相同。(有关调度器概念的详细信息,请参阅类文档中的“非阻塞纤维”部分)。
static VALUE rb_fiber_s_scheduler(VALUE klass) { return rb_fiber_scheduler_get(); }
为当前线程设置 Fiber
调度器。如果设置了调度器,则非阻塞纤维(由 Fiber.new
使用 blocking: false
创建,或由 Fiber.schedule
创建)在可能阻塞的操作时调用该调度器的钩子方法,并且当前线程将在最终化时调用调度器的 close
方法(允许调度器正确管理所有未完成的纤维)。
scheduler
可以是任何与 Fiber::Scheduler
相对应的类的对象。它的实现由用户决定。
另请参阅类文档中的“非阻塞纤维”部分。
static VALUE rb_fiber_set_scheduler(VALUE klass, VALUE scheduler) { return rb_fiber_scheduler_set(scheduler); }
将控制权交回恢复纤维的上下文,并将传递给它的任何参数传递给它。当下次调用 resume
时,纤维将从此处继续处理。传递给下一个 resume
的任何参数将是此 Fiber.yield
表达式计算出的值。
static VALUE rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass) { return rb_fiber_yield_kw(argc, argv, rb_keyword_given_p()); }
公共实例方法
如果纤维仍然可以恢复(或转移),则返回 true。在完成纤维块的执行后,此方法将始终返回 false
。
VALUE rb_fiber_alive_p(VALUE fiber_value) { return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value))); }
返回纤维的当前执行堆栈。start
、count
和 end
允许仅选择回溯的一部分。
def level3 Fiber.yield end def level2 level3 end def level1 level2 end f = Fiber.new { level1 } # It is empty before the fiber started f.backtrace #=> [] f.resume f.backtrace #=> ["test.rb:2:in `yield'", "test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"] p f.backtrace(1) # start from the item 1 #=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"] p f.backtrace(2, 2) # start from item 2, take 2 #=> ["test.rb:6:in `level2'", "test.rb:10:in `level1'"] p f.backtrace(1..3) # take items from 1 to 3 #=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'"] f.resume # It is nil after the fiber is finished f.backtrace #=> nil
static VALUE rb_fiber_backtrace(int argc, VALUE *argv, VALUE fiber) { return rb_vm_backtrace(argc, argv, &fiber_ptr(fiber)->cont.saved_ec); }
与 backtrace
相似,但将执行堆栈的每一行返回为 Thread::Backtrace::Location
。接受与 backtrace
相同的参数。
f = Fiber.new { Fiber.yield } f.resume loc = f.backtrace_locations.first loc.label #=> "yield" loc.path #=> "test.rb" loc.lineno #=> 1
static VALUE rb_fiber_backtrace_locations(int argc, VALUE *argv, VALUE fiber) { return rb_vm_backtrace_locations(argc, argv, &fiber_ptr(fiber)->cont.saved_ec); }
如果 fiber
处于阻塞状态,则返回 true
,否则返回 false
。如果 fiber
是通过传递 blocking: false
给 Fiber.new
或通过 Fiber.schedule
创建的,则它是非阻塞的。
请注意,即使该方法返回 false
,只有当当前线程中设置了 Fiber.scheduler
时,fiber 的行为才会有所不同。
有关详细信息,请参阅类文档中的“非阻塞纤维”部分。
VALUE rb_fiber_blocking_p(VALUE fiber) { return RBOOL(fiber_ptr(fiber)->blocking); }
通过引发一个不可捕获的异常来终止 fiber。它只终止给定的 fiber,而不会终止其他 fiber,如果另一个 fiber 正在调用 resume
或 transfer
,则返回 nil
给该 fiber。
Fiber#kill
只有在 fiber 处于 Fiber.yield
状态时才会中断另一个 fiber。如果在当前 fiber 上调用,则会在 Fiber#kill
调用点引发该异常。
如果 fiber 尚未启动,则直接过渡到终止状态。
如果 fiber 已经终止,则不执行任何操作。
如果在属于另一个线程的 fiber 上调用,则引发 FiberError
。
static VALUE rb_fiber_m_kill(VALUE self) { rb_fiber_t *fiber = fiber_ptr(self); if (fiber->killed) return Qfalse; fiber->killed = 1; if (fiber->status == FIBER_CREATED) { fiber->status = FIBER_TERMINATED; } else if (fiber->status != FIBER_TERMINATED) { if (fiber_current() == fiber) { fiber_check_killed(fiber); } else { fiber_raise(fiber_ptr(self), Qnil); } } return self; }
在 fiber 中上次调用 Fiber.yield
的位置引发异常。如果 fiber 尚未启动或已经运行完成,则引发 FiberError
。如果 fiber 正在 yield,则恢复它。如果它正在转移,则转移到它。但是,如果它正在恢复,则引发 FiberError
。
如果没有参数,则引发 RuntimeError
。如果只有一个 String
参数,则引发 RuntimeError
,并将该字符串作为消息。否则,第一个参数应该是 Exception
类的名称(或在发送 exception
消息时返回 Exception
对象的对象)。可选的第二个参数设置与异常关联的消息,第三个参数是回调信息的数组。异常由 begin...end
块的 rescue
子句捕获。
如果在属于另一个 Thread
的 Fiber
上调用,则引发 FiberError
。
static VALUE rb_fiber_m_raise(int argc, VALUE *argv, VALUE self) { return rb_fiber_raise(self, argc, argv); }
从上次调用 Fiber.yield
的位置恢复纤程,或者如果这是第一次调用 resume
则开始运行它。传递给 resume 的参数将是 Fiber.yield
表达式的值,或者如果这是第一次调用 resume
,则作为块参数传递给纤程的块。
或者,当调用 resume 时,它将评估为传递给纤程块中的下一个 Fiber.yield
语句的参数,或者如果它在没有任何 Fiber.yield
的情况下运行完成,则评估为块值。
static VALUE rb_fiber_m_resume(int argc, VALUE *argv, VALUE fiber) { return rb_fiber_resume_kw(fiber, argc, argv, rb_keyword_given_p()); }
返回纤程的存储哈希的副本。该方法只能在 Fiber.current
上调用。
static VALUE rb_fiber_storage_get(VALUE self) { storage_access_must_be_from_same_fiber(self); VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE); if (storage == Qnil) { return Qnil; } else { return rb_obj_dup(storage); } }
设置纤程的存储哈希。此功能处于实验阶段,将来可能会更改。该方法只能在 Fiber.current
上调用。
使用此方法时要小心,因为您可能会无意中清除重要的纤程存储状态。您应该主要使用 Fiber::[]=
来分配存储中的特定键。
您也可以使用 Fiber.new(storage: nil)
创建一个具有空存储的纤程。
示例
while request = request_queue.pop # Reset the per-request state: Fiber.current.storage = nil handle_request(request) end
static VALUE rb_fiber_storage_set(VALUE self, VALUE value) { if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) { rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL, "Fiber#storage= is experimental and may be removed in the future!"); } storage_access_must_be_from_same_fiber(self); fiber_storage_validate(value); fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value); return value; }
static VALUE fiber_to_s(VALUE fiber_value) { const rb_fiber_t *fiber = fiber_ptr(fiber_value); const rb_proc_t *proc; char status_info[0x20]; if (fiber->resuming_fiber) { snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status)); } else { snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status)); } if (!rb_obj_is_proc(fiber->first_proc)) { VALUE str = rb_any_to_s(fiber_value); strlcat(status_info, ">", sizeof(status_info)); rb_str_set_len(str, RSTRING_LEN(str)-1); rb_str_cat_cstr(str, status_info); return str; } GetProcPtr(fiber->first_proc, proc); return rb_block_to_s(fiber_value, &proc->block, status_info); }
将控制权转移到另一个纤程,从它上次停止的位置恢复它,或者如果它之前没有恢复,则启动它。调用纤程将被挂起,就像在调用 Fiber.yield
时一样。
接收 transfer 调用的纤程将其视为 resume 调用。传递给 transfer 的参数被视为传递给 resume 的参数。
两种纤程控制传递方式(一种是 resume
和 Fiber::yield
,另一种是 transfer
到和从纤程)不能随意混合。
-
如果 Fiber 的生命周期从 transfer 开始,它将永远无法 yield 或恢复控制传递,只能完成或转移回。 (它仍然可以恢复其他允许恢复的 Fiber。)
-
如果 Fiber 的生命周期从 resume 开始,它可以 yield 或转移到另一个
Fiber
,但只能以与它被赋予控制权的方式兼容的方式接收回控制权:如果它被转移,它只能被转移回来,如果它被 yield,它只能被恢复回来。 之后,它可以再次转移或 yield。
如果违反了这些规则,将引发 FiberError
。
对于单个 Fiber
设计,yield/resume 更易于使用(Fiber
只放弃控制权,它不需要考虑控制权被赋予谁),而 transfer 对于复杂情况更灵活,允许构建相互依赖的 Fiber 的任意图形。
示例
manager = nil # For local var to be visible inside worker block # This fiber would be started with transfer # It can't yield, and can't be resumed worker = Fiber.new { |work| puts "Worker: starts" puts "Worker: Performed #{work.inspect}, transferring back" # Fiber.yield # this would raise FiberError: attempt to yield on a not resumed fiber # manager.resume # this would raise FiberError: attempt to resume a resumed fiber (double resume) manager.transfer(work.capitalize) } # This fiber would be started with resume # It can yield or transfer, and can be transferred # back or resumed manager = Fiber.new { puts "Manager: starts" puts "Manager: transferring 'something' to worker" result = worker.transfer('something') puts "Manager: worker returned #{result.inspect}" # worker.resume # this would raise FiberError: attempt to resume a transferring fiber Fiber.yield # this is OK, the fiber transferred from and to, now it can yield puts "Manager: finished" } puts "Starting the manager" manager.resume puts "Resuming the manager" # manager.transfer # this would raise FiberError: attempt to transfer to a yielding fiber manager.resume
产生
Starting the manager Manager: starts Manager: transferring 'something' to worker Worker: starts Worker: Performed "something", transferring back Manager: worker returned "Something" Resuming the manager Manager: finished
static VALUE rb_fiber_m_transfer(int argc, VALUE *argv, VALUE self) { return rb_fiber_transfer_kw(self, argc, argv, rb_keyword_given_p()); }