class Fiber::Scheduler
这不是一个实际存在的类,而是 Scheduler 对象为了作为 Fiber.scheduler 的参数并处理非阻塞纤程而应遵循的接口文档。 另请参阅 Fiber 类文档中的“非阻塞纤程”部分,了解一些概念的解释。
Scheduler 的行为和用法应如下所示:
-
当非阻塞
Fiber中的执行到达某些阻塞操作(例如 sleep、等待进程或未就绪的 I/O)时,它会调用下面列出的调度器的某些钩子方法。 -
Scheduler会以某种方式注册当前纤程正在等待的内容,并使用Fiber.yield将控制权让给其他纤程(因此纤程将在等待其等待结束时被挂起,而同一线程中的其他纤程可以执行) -
在当前线程执行结束时,将调用调度器的方法 scheduler_close
-
调度器进入等待循环,检查所有被阻塞的纤程(它在钩子调用中已注册),并在等待的资源准备就绪时(例如,I/O 准备就绪或睡眠时间已过)恢复它们。
通过这种方式,每个单独的纤程代码都可以透明地实现并发执行。
Scheduler 的实现由 gem 提供,例如 Async。
钩子方法有
-
io_wait、io_read、io_write、io_pread、io_pwrite和io_select、io_close -
(随着 Ruby 开发人员开发出更多具有非阻塞调用的方法,列表将会扩展)
除非另有说明,否则钩子实现是强制性的:如果未实现,则尝试调用钩子的方法将失败。 为了提供向后兼容性,将来钩子将是可选的(如果由于为较旧的 Ruby 版本创建调度器而未实现,则需要此钩子的代码将不会失败,并且只会以阻塞方式运行)。
强烈建议调度器实现 fiber 方法,该方法由 Fiber.schedule 委托。
可以在 Ruby 的代码 test/fiber/scheduler.rb 中找到调度器的示例玩具实现
公共实例方法
由执行非反向 DNS 查找的任何方法调用。 最值得注意的方法是 Addrinfo.getaddrinfo,但还有许多其他方法。
该方法应返回与 hostname 解析到的 IP 地址对应的字符串数组,如果无法解析,则返回 nil。
相当详尽的所有可能调用站点列表
-
Addrinfo.getaddrinfo
-
Addrinfo.tcp
-
Addrinfo.udp
-
Addrinfo.ip
-
Addrinfo.new
-
Addrinfo.marshal_load
-
SOCKSSocket.new
-
TCPServer.new
-
TCPSocket.new
-
IPSocket.getaddress
-
TCPSocket.gethostbyname
-
UDPSocket#connect
-
UDPSocket#bind
-
UDPSocket#send
-
Socket.getaddrinfo
-
Socket.gethostbyname
-
Socket.pack_sockaddr_in
-
Socket.sockaddr_in
-
Socket.unpack_sockaddr_in
VALUE
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
{
VALUE arguments[] = {
hostname
};
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}
由诸如 Thread.join 和 Mutex 之类的方法调用,以表示当前 Fiber 被阻塞,直到另行通知(例如 unblock)或直到 timeout 经过。
blocker 是我们正在等待的内容,仅供参考(用于调试和日志记录)。 其值没有保证。
预期返回布尔值,指定阻塞操作是否成功。
VALUE
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{
return rb_funcall(scheduler, id_block, 2, blocker, timeout);
}
由 Ruby 的核心方法调用,以非阻塞方式运行阻塞操作。
建议的最小实现是
def blocking_operation_wait(work) Thread.new(&work).join end
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
{
struct rb_blocking_operation_wait_arguments arguments = {
.function = function,
.data = data,
.unblock_function = unblock_function,
.data2 = data2,
.flags = flags,
.state = state
};
VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_operation_wait_proc, (VALUE)&arguments);
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
}
在当前线程退出时调用。 调度器应实现此方法,以便所有等待的纤程都可以完成其执行。
建议的模式是在 close 方法中实现主事件循环。
VALUE
rb_fiber_scheduler_close(VALUE scheduler)
{
RUBY_ASSERT(ruby_thread_has_gvl_p());
VALUE result;
// The reason for calling `scheduler_close` before calling `close` is for
// legacy schedulers which implement `close` and expect the user to call
// it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
// which should call `scheduler_close`. If it were to call `close`, it
// would create an infinite loop.
result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
if (!UNDEF_P(result)) return result;
result = rb_check_funcall(scheduler, id_close, 0, NULL);
if (!UNDEF_P(result)) return result;
return Qnil;
}
Fiber.schedule 的实现。 该方法预期立即在单独的非阻塞纤程中运行给定的代码块,并返回该 Fiber。
建议的最小实现是
def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end
由 IO#pread 或 IO::Buffer#pread 调用,以从 io 的偏移量 from 处读取 length 个字节到给定 offset 处的指定 buffer 中(请参阅 IO::Buffer)。
此方法在语义上与 io_read 相同,但它允许指定要读取的偏移量,并且通常更适合在同一文件上进行异步 IO。
该方法应被视为实验性。
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
VALUE arguments[] = {
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
}
由 IO#pwrite 或 IO::Buffer#pwrite 调用,以将 length 个字节写入 io 的偏移量 from 处,到给定 offset 处的指定 buffer 中(请参阅 IO::Buffer)。
此方法在语义上与 io_write 相同,但它允许指定要写入的偏移量,并且通常更适合在同一文件上进行异步 IO。
该方法应被视为实验性。
VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
VALUE arguments[] = {
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
}
由 IO#read 或 IO#Buffer.read 调用,以从 io 中读取 length 个字节到给定 offset 处的指定 buffer 中(请参阅 IO::Buffer)。
length 参数是“要读取的最小长度”。 如果 IO 缓冲区大小为 8KiB,但 length 为 1024 (1KiB),则可能会读取最多 8KiB,但至少会读取 1KiB。 通常,只有在读取数据时发生错误的情况下,才会读取少于 length 的数据。
指定 length 为 0 是有效的,表示尝试至少读取一次并返回任何可用的数据。
建议的实现应尝试以非阻塞方式从 io 读取,如果 io 未准备就绪,则调用 io_wait(这将将控制权让给其他纤程)。
有关可用于返回数据的接口,请参阅 IO::Buffer。
预期返回读取的字节数,或者,如果发生错误,则返回 -errno(与系统错误代码对应的取反数)。
该方法应被视为实验性。
VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
VALUE arguments[] = {
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
}
由 IO.select 调用,以询问指定的描述符是否在指定的 timeout 内已准备好进行指定的事件。
预期返回已准备就绪的 IO 的 3 元组 Array。
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
{
VALUE arguments[] = {
readables, writables, exceptables, timeout
};
return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
}
由 IO#wait、IO#wait_readable、IO#wait_writable 调用,以询问指定的描述符是否在指定的 timeout 内已准备好进行指定的事件。
events 是 IO::READABLE、IO::WRITABLE 和 IO::PRIORITY 的位掩码。
建议的实现应注册哪个 Fiber 正在等待哪个资源,并立即调用 Fiber.yield 将控制权传递给其他纤程。 然后,在 close 方法中,调度器可能会将所有 I/O 资源分派给等待它的纤程。
预期返回立即准备就绪的事件的子集。
VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
}
由 IO#write 或 IO::Buffer#write 调用,以从给定 offset 处的指定 buffer(请参阅 IO::Buffer)向 io 写入 length 个字节。
length 参数是“要写入的最小长度”。 如果 IO 缓冲区大小为 8KiB,但指定的 length 为 1024 (1KiB),则最多将写入 8KiB,但至少将写入 1KiB。 通常,只有在写入数据时发生错误的情况下,才会写入少于 length 的数据。
指定 length 为 0 是有效的,表示尝试至少写入一次,并写入尽可能多的数据。
建议的实现应尝试以非阻塞方式写入 io,如果 io 未准备就绪,则调用 io_wait(这将将控制权让给其他纤程)。
有关可用于高效地从缓冲区获取数据的接口,请参阅 IO::Buffer。
预期返回写入的字节数,或者,如果发生错误,则返回 -errno(与系统错误代码对应的取反数)。
该方法应被视为实验性。
VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
VALUE arguments[] = {
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_write, 4, arguments);
}
由 Kernel#sleep 和 Mutex#sleep 调用,并预期提供以非阻塞方式休眠的实现。 实现可能会将当前纤程注册到一些“哪个纤程等待到哪个时刻”的列表中,调用 Fiber.yield 来传递控制权,然后在 close 中恢复等待时间已过的纤程。
VALUE
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
{
return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
}
由 Process::Status.wait 调用,以便等待指定的进程。 有关参数描述,请参阅该方法说明。
建议的最小实现
Thread.new do Process::Status.wait(pid, flags) end.value
此钩子是可选的:如果当前调度器中不存在此钩子,则 Process::Status.wait 将作为阻塞方法运行。
预期返回 Process::Status 实例。
VALUE
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
{
VALUE arguments[] = {
PIDT2NUM(pid), RB_INT2NUM(flags)
};
return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
}
由 Timeout.timeout 调用,以在给定 duration 内执行给定的 block。 它也可以由调度器或用户代码直接调用。
尝试在可能的情况下将给定 block 的执行时间限制在给定的 duration 内。当非阻塞操作导致 block 的执行时间超过指定的 duration 时,该非阻塞操作应被中断,并抛出指定的 exception_class,并使用给定的 exception_arguments 进行构造。
通常,执行超时被认为是有风险的。此实现只会中断非阻塞操作。这是设计使然,因为预期非阻塞操作可能会由于各种不可预测的原因而失败,因此应用程序应该已经能够健壮地处理这些情况,并且也暗示了超时。
然而,由于这种设计,如果 block 没有调用任何非阻塞操作,则将无法中断它。如果您希望为超时提供可预测的点,请考虑添加 +sleep(0)+。
如果代码块成功执行,将返回其结果。
异常通常会使用 Fiber#raise 抛出。
VALUE
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
{
VALUE arguments[] = {
timeout, exception, message
};
return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
}
调用以唤醒先前使用 block 阻塞的 Fiber (例如,Mutex#lock 调用 block,Mutex#unlock 调用 unblock)。调度器应使用 fiber 参数来了解哪个 fiber 被解除阻塞。
blocker 是等待的内容,但它仅供参考(用于调试和日志记录),并且不能保证与 block 的 blocker 值相同。
VALUE
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
RUBY_ASSERT(rb_obj_is_fiber(fiber));
return rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
}