Fiber

Fibers 提供了一种协作并发机制。

上下文切换

Fibers 执行用户提供的代码块。在执行过程中,代码块可以调用 Fiber.yieldFiber.transfer 切换到另一个 fiber。Fiber#resume 用于从调用 Fiber.yield 的地方继续执行。

#!/usr/bin/env ruby

puts "1: Start program."

f = Fiber.new do
  puts "3: Entered fiber."
  Fiber.yield
  puts "5: Resumed fiber."
end

puts "2: Resume fiber first time."
f.resume

puts "4: Resume fiber second time."
f.resume

puts "6: Finished."

此程序演示了 fibers 的流程控制。

调度器

调度器接口用于拦截阻塞操作。典型的实现将是像 EventMachineAsync 这样的 gem 的包装器。这种设计提供了事件循环实现和应用程序代码之间的关注点分离。它还允许分层调度器,这些调度器可以执行检测。

设置当前线程的调度程序

Fiber.set_scheduler(MyScheduler.new)

当线程退出时,会隐式调用 set_scheduler

Fiber.set_scheduler(nil)

设计

调度程序接口旨在成为用户代码和阻塞操作之间一个无偏见的轻量级层。调度程序钩子应避免翻译或转换参数或返回值。理想情况下,来自用户代码的完全相同的参数将直接提供给调度程序钩子,没有任何更改。

接口

这是您需要实现的接口。

class Scheduler
  # Wait for the specified process ID to exit.
  # This hook is optional.
  # @parameter pid [Integer] The process ID to wait for.
  # @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
  # @returns [Process::Status] A process status instance.
  def process_wait(pid, flags)
    Thread.new do
      Process::Status.wait(pid, flags)
    end.value
  end

  # Wait for the given io readiness to match the specified events within
  # the specified timeout.
  # @parameter event [Integer] A bit mask of `IO::READABLE`,
  #   `IO::WRITABLE` and `IO::PRIORITY`.
  # @parameter timeout [Numeric] The amount of time to wait for the event in seconds.
  # @returns [Integer] The subset of events that are ready.
  def io_wait(io, events, timeout)
  end

  # Read from the given io into the specified buffer.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to read from.
  # @parameter buffer [IO::Buffer] The buffer to read into.
  # @parameter length [Integer] The minimum amount to read.
  def io_read(io, buffer, length)
  end

  # Write from the given buffer into the specified IO.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to write to.
  # @parameter buffer [IO::Buffer] The buffer to write from.
  # @parameter length [Integer] The minimum amount to write.
  def io_write(io, buffer, length)
  end

  # Sleep the current task for the specified duration, or forever if not
  # specified.
  # @parameter duration [Numeric] The amount of time to sleep in seconds.
  def kernel_sleep(duration = nil)
  end

  # Execute the given block. If the block execution exceeds the given timeout,
  # the specified exception `klass` will be raised. Typically, only non-blocking
  # methods which enter the scheduler will raise such exceptions.
  # @parameter duration [Integer] The amount of time to wait, after which an exception will be raised.
  # @parameter klass [Class] The exception class to raise.
  # @parameter *arguments [Array] The arguments to send to the constructor of the exception.
  # @yields {...} The user code to execute.
  def timeout_after(duration, klass, *arguments, &block)
  end

  # Resolve hostname to an array of IP addresses.
  # This hook is optional.
  # @parameter hostname [String] Example: "www.ruby-lang.org".
  # @returns [Array] An array of IPv4 and/or IPv6 address strings that the hostname resolves to.
  def address_resolve(hostname)
  end

  # Block the calling fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter timeout [Numeric | Nil] The amount of time to wait for in seconds.
  # @returns [Boolean] Whether the blocking operation was successful or not.
  def block(blocker, timeout = nil)
  end

  # Unblock the specified fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter fiber [Fiber] The fiber to unblock.
  # @reentrant Thread safe.
  def unblock(blocker, fiber)
  end

  # Intercept the creation of a non-blocking fiber.
  # @returns [Fiber]
  def fiber(&block)
    Fiber.new(blocking: false, &block)
  end

  # Invoked when the thread exits.
  def close
    self.run
  end

  def run
    # Implement event loop here.
  end
end

将来可能会引入其他钩子,我们将使用功能检测来启用这些钩子。

非阻塞执行

调度程序钩子仅在特殊的非阻塞执行上下文中使用。非阻塞执行上下文会引入不确定性,因为调度程序钩子的执行可能会在您的程序中引入上下文切换点。

纤程

纤程可用于创建非阻塞执行上下文。

Fiber.new do
  puts Fiber.current.blocking? # false

  # May invoke `Fiber.scheduler&.io_wait`.
  io.read(...)

  # May invoke `Fiber.scheduler&.io_wait`.
  io.write(...)

  # Will invoke `Fiber.scheduler&.kernel_sleep`.
  sleep(n)
end.resume

我们还引入了一种新方法,简化了这些非阻塞纤程的创建

Fiber.schedule do
  puts Fiber.current.blocking? # false
end

此方法的目的是允许调度程序在内部决定何时启动纤程的策略,以及是否使用对称或非对称纤程。

您也可以创建阻塞执行上下文

Fiber.new(blocking: true) do
  # Won't use the scheduler:
  sleep(n)
end

但是,除非您正在实现调度程序,否则您通常应该避免这样做。

IO

默认情况下,I/O 是非阻塞的。并非所有操作系统都支持非阻塞 I/O。Windows 是一个值得注意的例子,其中套接字 I/O 可以是非阻塞的,但管道 I/O 是阻塞的。只要存在调度程序并且当前线程是非阻塞的,操作就会调用调度程序。

互斥锁

Mutex 类可以在非阻塞上下文中使用,并且是特定于纤程的。

条件变量

ConditionVariable 类可以在非阻塞环境中使用,并且是特定于纤程的。

队列 / 有大小的队列

QueueSizedQueue 类可以在非阻塞环境中使用,并且是特定于纤程的。

Thread

Thread#join 操作可以在非阻塞环境中使用,并且是特定于纤程的。