类 Fiber::Scheduler

这不是一个现有的类,而是对 Scheduler 对象应该遵守的接口的文档,以便用作 Fiber.scheduler 的参数并处理非阻塞纤维。有关某些概念的解释,请参阅 Fiber 类文档中的“非阻塞纤维”部分。

调度程序的行为和用法应如下所示

  • 当非阻塞 Fiber 中的执行到达某些阻塞操作(例如睡眠、等待进程或非就绪 I/O)时,它会调用调度程序的一些挂钩方法,如下所示。

  • Scheduler 以某种方式注册当前纤维正在等待的内容,并使用 Fiber.yield 将控制权让给其他纤维(因此纤维将在等待其等待结束时被挂起,并且同一线程中的其他纤维可以执行)。

  • 在当前线程执行结束时,将调用调度程序的方法 scheduler_close。

  • 调度程序进入一个等待循环,检查所有阻塞的纤维(它在挂钩调用时已注册),并在等待的资源就绪时恢复它们(例如,I/O 就绪或睡眠时间已过)。

这样,每个单独的 Fiber 代码的并发执行将以透明的方式实现。

Scheduler 实现由宝石提供,例如 Async

挂钩方法是

除非另有说明,钩子实现是强制性的:如果未实现,尝试调用钩子的方法将失败。为了提供向后兼容性,将来钩子将是可选的(如果未实现,由于调度程序是为旧版本的 Ruby 创建的,需要此钩子的代码将不会失败,而只会以阻塞方式运行)。

还强烈建议调度程序实现 fiber 方法,该方法由 Fiber.schedule 代理。

调度程序的示例玩具实现可以在 Ruby 的代码中找到,位于 test/fiber/scheduler.rb

公共实例方法

address_resolve(hostname) → array_of_strings 或 nil 点击以切换源代码

由执行非反向 DNS 查找的任何方法调用。最显着的方法是 Addrinfo.getaddrinfo,但还有许多其他方法。

该方法预计将返回一个字符串数组,对应于 hostname 解析到的 IP 地址,或者如果无法解析,则返回 nil

所有可能调用站点的相当详尽的列表

  • Addrinfo.getaddrinfo

  • Addrinfo.tcp

  • Addrinfo.udp

  • Addrinfo.ip

  • Addrinfo.new

  • Addrinfo.marshal_load

  • SOCKSSocket.new

  • TCPServer.new

  • TCPSocket.new

  • IPSocket.getaddress

  • TCPSocket.gethostbyname

  • UDPSocket#connect

  • UDPSocket#bind

  • UDPSocket#send

  • Socket.getaddrinfo

  • Socket.gethostbyname

  • Socket.pack_sockaddr_in

  • Socket.sockaddr_in

  • Socket.unpack_sockaddr_in

VALUE
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
{
    VALUE arguments[] = {
        hostname
    };

    return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}
block(blocker, timeout = nil) 点击以切换源代码

由诸如 Thread.join 之类的方法以及 Mutex 调用,以表示当前 Fiber 被阻塞,直到另行通知(例如 unblock)或直到 timeout 过期。

blocker 是我们正在等待的内容,仅供信息(用于调试和记录)。关于其值没有保证。

预计返回布尔值,指定阻塞操作是否成功。

VALUE
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{
    return rb_funcall(scheduler, id_block, 2, blocker, timeout);
}
close() 点击以切换源代码

当当前线程退出时调用。调度程序预计将实现此方法,以允许所有等待的纤程完成其执行。

建议的模式是在 close 方法中实现主事件循环。

VALUE
rb_fiber_scheduler_close(VALUE scheduler)
{
    VM_ASSERT(ruby_thread_has_gvl_p());

    VALUE result;

    // The reason for calling `scheduler_close` before calling `close` is for
    // legacy schedulers which implement `close` and expect the user to call
    // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
    // which should call `scheduler_close`. If it were to call `close`, it
    // would create an infinite loop.

    result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
    if (!UNDEF_P(result)) return result;

    result = rb_check_funcall(scheduler, id_close, 0, NULL);
    if (!UNDEF_P(result)) return result;

    return Qnil;
}
fiber(&block)

Fiber.schedule 的实现。该方法预计会立即在单独的非阻塞纤程中运行给定的代码块,并返回该 Fiber

建议的最小实现是

def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  fiber.resume
  fiber
end
io_pread(io, buffer, from, length, offset) → 读取长度或 -errno 点击切换源代码

IO#preadIO::Buffer#pread 调用,从 io 中偏移量为 from 的位置读取 length 字节到指定的 buffer(参见 IO::Buffer)中,偏移量为 offset

此方法在语义上与 io_read 相同,但它允许指定读取的偏移量,并且通常更适合于对同一文件的异步 IO 操作。

此方法应被视为实验性

VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
}
io_pwrite(io, buffer, from, length, offset) → 写入长度或 -errno 点击切换源代码

IO#pwriteIO::Buffer#pwrite 调用,将 length 字节写入 io 中偏移量为 from 的位置,写入到指定的 buffer(参见 IO::Buffer)中,偏移量为 offset

此方法在语义上与 io_write 相同,但它允许指定写入的偏移量,并且通常更适合于对同一文件的异步 IO 操作。

此方法应被视为实验性

VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
}
io_read(io, buffer, length, offset) → 读取长度或 -errno 点击切换源代码

IO#read 或 IO#Buffer.read 调用,从 io 中读取 length 字节到指定的 buffer(参见 IO::Buffer)中,偏移量为 offset

length 参数是“要读取的最小长度”。如果 IO 缓冲区大小为 8KiB,但 length1024(1KiB),则最多可以读取 8KiB,但至少会读取 1KiB。通常情况下,只有在读取数据时发生错误的情况下,才会读取少于 length 的数据。

指定 length 为 0 是有效的,表示尝试至少读取一次并返回任何可用数据。

建议的实现应该尝试以非阻塞方式从 io 中读取数据,如果 io 未准备好,则调用 io_wait(这将把控制权让给其他纤程)。

参见 IO::Buffer,了解可用于返回数据的接口。

预期返回读取的字节数,或者在发生错误的情况下返回 -errno(与系统错误代码相对应的负数)。

此方法应被视为实验性

VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_read, 4, arguments);
}
io_select(readables, writables, exceptables, timeout) 点击切换源代码

IO.select 调用,以询问在指定的 timeout 内,指定描述符是否已准备好进行指定的事件。

预期返回包含已准备好的 I/O 的 3 元组 Array

VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
{
    VALUE arguments[] = {
        readables, writables, exceptables, timeout
    };

    return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
}
io_wait(io, events, timeout) 点击切换源代码

IO#waitIO#wait_readableIO#wait_writable 调用,以询问在指定的 timeout 内,指定描述符是否已准备好进行指定的事件。

eventsIO::READABLEIO::WRITABLEIO::PRIORITY 的位掩码。

建议的实现应该注册哪个 Fiber 正在等待哪些资源,并立即调用 Fiber.yield 将控制权传递给其他纤维。然后,在 close 方法中,调度程序可能会将所有 I/O 资源分派给正在等待它的纤维。

预期返回立即准备好的事件子集。

VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
    return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
}
io_write(io, buffer, length, offset) → written length or -errno 点击切换源代码

IO#writeIO::Buffer#write 调用,以将 length 字节从指定的 buffer(参见 IO::Buffer)写入 io,偏移量为 offset

length 参数是“要写入的最小长度”。如果 IO 缓冲区大小为 8KiB,但指定的 length 为 1024(1KiB),则最多写入 8KiB,但至少写入 1KiB。通常,只有在写入数据时发生错误的情况下,才会写入少于 length 的数据。

指定 length 为 0 是有效的,表示尝试至少写入一次,尽可能多地写入数据。

建议的实现应该尝试以非阻塞方式写入 io,如果 io 未准备好(这将把控制权让给其他纤维),则调用 io_wait

参见 IO::Buffer,了解可用于从缓冲区高效获取数据的接口。

预期返回写入的字节数,或在发生错误的情况下返回 -errno(与系统错误代码相对应的负数)。

此方法应被视为实验性

VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
    VALUE arguments[] = {
        io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
    };

    return rb_check_funcall(scheduler, id_io_write, 4, arguments);
}
kernel_sleep(duration = nil) 点击切换源代码

Kernel#sleep 和 Mutex#sleep 调用,预期提供一种非阻塞方式的睡眠实现。实现可能会将当前的 fiber 注册到某个“哪个 fiber 等待到什么时间”列表中,调用 Fiber.yield 来传递控制权,然后在 close 中恢复等待时间已过的 fiber。

VALUE
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
{
    return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
}
process_wait(pid, flags) 点击切换源代码

Process::Status.wait 调用,用于等待指定的进程。有关参数说明,请参见该方法的描述。

建议的最小实现

Thread.new do
  Process::Status.wait(pid, flags)
end.value

此钩子是可选的:如果当前调度程序中没有此钩子,Process::Status.wait 将表现为阻塞方法。

预期返回一个 Process::Status 实例。

VALUE
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
{
    VALUE arguments[] = {
        PIDT2NUM(pid), RB_INT2NUM(flags)
    };

    return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
}
timeout_after(duration, exception_class, *exception_arguments, &block) → block 的结果 点击切换源代码

由 Timeout.timeout 调用,用于在给定的 duration 内执行给定的 block。调度程序或用户代码也可以直接调用它。

如果可能,尝试将给定 block 的执行时间限制在给定的 duration 内。当非阻塞操作导致 block 的执行时间超过指定的 duration 时,该非阻塞操作应通过使用给定的 exception_arguments 构造的指定 exception_class 来中断。

通常认为一般的执行超时存在风险。此实现只会中断非阻塞操作。这是设计使然,因为预期非阻塞操作可能会由于各种不可预测的原因而失败,因此应用程序应该已经能够稳健地处理这些情况,并隐含地处理超时。

但是,由于这种设计,如果 block 没有调用任何非阻塞操作,则无法中断它。如果您希望提供可预测的超时点,请考虑添加 +sleep(0)+。

如果 block 成功执行,将返回其结果。

异常通常使用 Fiber#raise 抛出。

VALUE
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
{
    VALUE arguments[] = {
        timeout, exception, message
    };

    return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
}
unblock(blocker, fiber) 点击切换源代码

调用此方法来唤醒之前使用 Fiberblock 阻塞的 Fiber(例如,Mutex#lock 调用 block,而 Mutex#unlock 调用 unblock)。调度程序应使用 fiber 参数来了解哪个 fiber 被解除了阻塞。

blocker 是等待的对象,但它仅供参考(用于调试和日志记录),并且不能保证与 blockblocker 相同。

VALUE
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
    VM_ASSERT(rb_obj_is_fiber(fiber));

    return rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
}