Ractor - Ruby 的类 Actor 并发抽象

Ractor 旨在提供 Ruby 的并行执行特性,而无需担心线程安全问题。

概述

解释器进程中的多个 Ractor

您可以创建多个 Ractor,它们可以并行运行。

多个 Ractor 之间的有限共享

与线程不同,Ractor 不共享所有内容。

Ractor 之间的两种通信方式

Ractor 通过 Ractor 之间的消息交换进行相互通信并同步执行。有两种消息交换协议:推送类型(消息传递)和拉取类型。

发送消息的复制和移动语义

要将不可共享对象作为消息发送,需要复制或移动对象。

线程安全

Ractor 有助于编写线程安全的并发程序,但我们也可以使用 Ractor 编写线程不安全的程序。

如果没有 Ractor,我们需要跟踪所有状态变化以调试线程安全问题。使用 Ractor,您可以专注于与 Ractor 共享的可疑代码。

创建和终止

Ractor.new

# Ractor.new with a block creates new Ractor
r = Ractor.new do
  # This block will be run in parallel with other ractors
end

# You can name a Ractor with `name:` argument.
r = Ractor.new name: 'test-name' do
end

# and Ractor#name returns its name.
r.name #=> 'test-name'

给定代码块隔离

Ractor 在给定的代码块中执行给定的 expr。给定代码块将通过 Proc#isolate 方法与外部范围隔离(尚未向 Ruby 用户公开)。为了防止在 Ractor 之间共享不可共享的对象,代码块的外部变量、self 和其他信息将被隔离。

Proc#isolateRactor 创建时(当调用 Ractor.new 时)调用。如果由于外部变量等原因而无法隔离给定的 Proc 对象,则会引发错误。

begin
  a = true
  r = Ractor.new do
    a #=> ArgumentError because this block accesses `a`.
  end
  r.take # see later
rescue ArgumentError
end
r = Ractor.new do
  p self.class #=> Ractor
  self.object_id
end
r.take == self.object_id #=> false

传递给 Ractor.new() 的参数将成为给定代码块的代码块参数。但是,解释器不会传递参数对象引用,而是将它们作为消息发送(有关详细信息,请参见下文)。

r = Ractor.new 'ok' do |msg|
  msg #=> 'ok'
end
r.take #=> 'ok'
# almost similar to the last example
r = Ractor.new do
  msg = Ractor.receive
  msg
end
r.send 'ok'
r.take #=> 'ok'

给定代码块的执行结果

给定代码块的返回值将成为传出消息(有关详细信息,请参见下文)。

r = Ractor.new do
  'ok'
end
r.take #=> `ok`
# almost similar to the last example
r = Ractor.new do
  Ractor.yield 'ok'
end
r.take #=> 'ok'

给定代码块中的错误将传播到传出消息的接收者。

r = Ractor.new do
  raise 'ok' # exception will be transferred to the receiver
end

begin
  r.take
rescue Ractor::RemoteError => e
  e.cause.class   #=> RuntimeError
  e.cause.message #=> 'ok'
  e.ractor        #=> r
end

Ractor 之间的通信

Ractor 之间的通信是通过发送和接收消息来实现的。有两种相互通信的方式。

用户可以使用 (1) 控制程序执行时序,但不应该使用 (2) 控制(仅作为临界区管理)。

对于消息发送和接收,有两种类型的 API:推送类型和拉取类型。

发送/接收端口

每个 Ractor 都有传入端口传出端口。传入端口连接到无限大小的传入队列。

Ractor r
                 +-------------------------------------------+
                 | incoming                         outgoing |
                 | port                                 port |
   r.send(obj) ->*->[incoming queue]     Ractor.yield(obj) ->*-> r.take
                 |                |                          |
                 |                v                          |
                 |           Ractor.receive                  |
                 +-------------------------------------------+


Connection example: r2.send obj on r1、Ractor.receive on r2
  +----+     +----+
  * r1 |---->* r2 *
  +----+     +----+


Connection example: Ractor.yield(obj) on r1, r1.take on r2
  +----+     +----+
  * r1 *---->- r2 *
  +----+     +----+

Connection example: Ractor.yield(obj) on r1 and r2,
                    and waiting for both simultaneously by Ractor.select(r1, r2)

  +----+
  * r1 *------+
  +----+      |
              +----> Ractor.select(r1, r2)
  +----+      |
  * r2 *------|
  +----+
r = Ractor.new do
  msg = Ractor.receive # Receive from r's incoming queue
  msg # send back msg as block return value
end
r.send 'ok' # Send 'ok' to r's incoming port -> incoming queue
r.take      # Receive from r's outgoing port

最后一个示例显示了以下 Ractor 网络。

+------+        +---+
  * main |------> * r *---+
  +------+        +---+   |
      ^                   |
      +-------------------+

并且可以通过为 Ractor.new 使用参数来简化此代码。

# Actual argument 'ok' for `Ractor.new()` will be sent to created Ractor.
r = Ractor.new 'ok' do |msg|
  # Values for formal parameters will be received from incoming queue.
  # Similar to: msg = Ractor.receive

  msg # Return value of the given block will be sent via outgoing port
end

# receive from the r's outgoing port.
r.take #=> `ok`

Ractor.new 的块的返回值

正如已经解释的那样,可以通过 Ractor#take 获取 Ractor.new 的返回值(Ractor.new{ expr }expr 的计算值)。

Ractor.new{ 42 }.take #=> 42

当块返回值可用时,Ractor 已经死亡,因此除了接收的 Ractor 之外,没有其他 Ractor 可以触及该返回值,因此可以通过此通信路径发送任何值而无需任何修改。

r = Ractor.new do
  a = "hello"
  binding
end

r.take.eval("p a") #=> "hello" (other communication path can not send a Binding object directly)

使用 Ractor.select 等待多个 Ractor

你可以使用 Ractor.select(*ractors) 等待多个 Ractor 的 yieldRactor.select() 的返回值是 [r, msg],其中 r 是产生值的 Ractormsg 是产生的值。

等待单个 Ractor(与 Ractor.take 相同)

r1 = Ractor.new{'r1'}

r, obj = Ractor.select(r1)
r == r1 and obj == 'r1' #=> true

等待两个 Ractor

r1 = Ractor.new{'r1'}
r2 = Ractor.new{'r2'}
rs = [r1, r2]
as = []

# Wait for r1 or r2's Ractor.yield
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj

# Second try (rs only contain not-closed ractors)
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj
as.sort == ['r1', 'r2'] #=> true

Complex 示例

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
rs = RN.times.map{|i|
  Ractor.new pipe, i do |pipe, i|
    msg = pipe.take
    msg # ping-pong
  end
}
RN.times{|i|
  pipe << i
}
RN.times.map{
  r, n = Ractor.select(*rs)
  rs.delete r
  n
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

多个 Ractor 可以向一个 Ractor 发送消息。

# Create 10 ractors and they send objects to pipe ractor.
# pipe ractor yield received objects

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
rs = RN.times.map{|i|
  Ractor.new pipe, i do |pipe, i|
    pipe << i
  end
}

RN.times.map{
  pipe.take
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

待办事项:当前的 Ractor.select() 存在与 select(2) 相同的问题,因此应该改进此接口。

待办事项:Go 语言的 select 语法使用轮询技术来实现公平调度。现在 Ractor.select() 不使用它。

关闭 Ractor 的端口

示例(尝试从关闭的 Ractor 获取)

r = Ractor.new do
  'finish'
end
r.take # success (will return 'finish')
begin
  o = r.take # try to take from closed Ractor
rescue Ractor::ClosedError
  'ok'
else
  "ng: #{o}"
end

示例(尝试向关闭(已终止)的 Ractor 发送消息)

r = Ractor.new do
end

r.take # wait terminate

begin
  r.send(1)
rescue Ractor::ClosedError
  'ok'
else
  'ng'
end

当多个 Ractor 正在等待 Ractor.yield() 时,Ractor#close_outgoing 将通过引发异常 (ClosedError) 来取消所有阻塞。

通过复制发送消息

如果 obj 是不可共享的对象,则 Ractor#send(obj)Ractor.yield(obj) 会深度复制 obj

obj = 'str'.dup
r = Ractor.new obj do |msg|
  # return received msg's object_id
  msg.object_id
end

obj.object_id == r.take #=> false

一些对象不支持复制其值,并且会引发异常。

obj = Thread.new{}
begin
  Ractor.new obj do |msg|
    msg
  end
rescue TypeError => e
  e.message #=> #<TypeError: allocator undefined for Thread>
else
  'ng' # unreachable here
end

通过移动发送消息

Ractor#send(obj, move: true)Ractor.yield(obj, move: true)obj 移动到目标 Ractor。如果源 Ractor 访问了移动的对象(例如,调用 obj.foo() 之类的方法),则会发生错误。

# move with Ractor#send
r = Ractor.new do
  obj = Ractor.receive
  obj << ' world'
end

str = 'hello'
r.send str, move: true
modified = r.take #=> 'hello world'

# str is moved, and accessing str from this Ractor is prohibited

begin
  # Error because it touches moved str.
  str << ' exception' # raise Ractor::MovedError
rescue Ractor::MovedError
  modified #=> 'hello world'
else
  raise 'unreachable'
end
# move with Ractor.yield
r = Ractor.new do
  obj = 'hello'
  Ractor.yield obj, move: true
  obj << 'world'  # raise Ractor::MovedError
end

str = r.take
begin
  r.take
rescue Ractor::RemoteError
  p str #=> "hello"
end

一些对象不支持移动,并且会引发异常。

r = Ractor.new do
  Ractor.receive
end

r.send(Thread.new{}, move: true) #=> allocator undefined for Thread (TypeError)

为了实现对移动对象的访问禁止,使用了类替换技术来实现。

可共享对象

以下对象是可共享的。

实现:现在可共享对象 (RVALUE) 具有 FL_SHAREABLE 标志。该标志可以延迟添加。

为了创建可共享对象,提供了 Ractor.make_shareable(obj) 方法。在这种情况下,它会尝试通过冻结 obj 和递归遍历的对象使其成为可共享对象。此方法接受 copy: 关键字(默认值为 false)。Ractor.make_shareable(obj, copy: true) 尝试深度复制 obj 并使复制的对象成为可共享对象。

在 Ractor 之间隔离不可共享对象的语言更改

为了在 Ractor 之间隔离不可共享的对象,我们在多 Ractor Ruby 程序中引入了额外的语言语义。

请注意,在不使用 Ractor 的情况下,不需要这些额外的语义(与 Ruby 2 100% 兼容)。

全局变量

只有主 Ractor(在解释器启动时创建的 Ractor)才能访问全局变量。

$gv = 1
r = Ractor.new do
  $gv
end

begin
  r.take
rescue Ractor::RemoteError => e
  e.cause.message #=> 'can not access global variables from non-main Ractors'
end

请注意,一些特殊的全局变量是 Ractor 本地的,例如 $stdin$stdout$stderr。有关更多详细信息,请参阅 [Bug #17268]

可共享对象的实例变量

如果引用的值是可共享对象,则可以从非主 Ractor 获取类/模块的实例变量。

class C
  @iv = 1
end

p Ractor.new do
  class C
     @iv
  end
end.take #=> 1

否则,只有主 Ractor 才能访问可共享对象的实例变量。

class C
  @iv = [] # unshareable object
end

Ractor.new do
  class C
    begin
      p @iv
    rescue Ractor::IsolationError
      p $!.message
      #=> "can not get unshareable values from instance variables of classes/modules from non-main Ractors"
    end

    begin
      @iv = 42
    rescue Ractor::IsolationError
      p $!.message
      #=> "can not set instance variables of classes/modules by non-main Ractors"
    end
  end
end.take
shared = Ractor.new{}
shared.instance_variable_set(:@iv, 'str')

r = Ractor.new shared do |shared|
  p shared.instance_variable_get(:@iv)
end

begin
  r.take
rescue Ractor::RemoteError => e
  e.cause.message #=> can not access instance variables of shareable objects from non-main Ractors (Ractor::IsolationError)
end

请注意,Ractor 上也禁止类/模块对象的实例变量。

Class 变量

只有主 Ractor 才能访问类变量。

class C
  @@cv = 'str'
end

r = Ractor.new do
  class C
    p @@cv
  end
end


begin
  r.take
rescue => e
  e.class #=> Ractor::IsolationError
end

常量

只有主 Ractor 才能读取引用不可共享对象的常量。

class C
  CONST = 'str'
end
r = Ractor.new do
  C::CONST
end
begin
  r.take
rescue => e
  e.class #=> Ractor::IsolationError
end

只有主 Ractor 才能定义引用不可共享对象的常量。

class C
end
r = Ractor.new do
  C::CONST = 'str'
end
begin
  r.take
rescue => e
  e.class #=> Ractor::IsolationError
end

为了创建支持多 Ractor 的库,常量应该仅引用可共享对象。

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}

在这种情况下,TABLE 引用一个不可共享的 Hash 对象。因此,其他 Ractor 不能引用 TABLE 常量。为了使其可共享,我们可以像这样使用 Ractor.make_shareable()

TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )

为了简化操作,Ruby 3.0 引入了新的 shareable_constant_value 指令。

# shareable_constant_value: literal

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}
#=> Same as: TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )

shareable_constant_value 指令接受以下模式(描述使用示例:CONST = expr

除了 none 模式(默认值)之外,保证了分配的常量仅引用可共享对象。

有关更多详细信息,请参阅 doc/syntax/comments.rdoc

实现说明

示例

Actor 模型中的传统环形示例

RN = 1_000
CR = Ractor.current

r = Ractor.new do
  p Ractor.receive
  CR << :fin
end

RN.times{
  r = Ractor.new r do |next_r|
    next_r << Ractor.receive
  end
}

p :setup_ok
r << 1
p Ractor.receive

Fork-join

def fib n
  if n < 2
    1
  else
    fib(n-2) + fib(n-1)
  end
end

RN = 10
rs = (1..RN).map do |i|
  Ractor.new i do |i|
    [i, fib(i)]
  end
end

until rs.empty?
  r, v = Ractor.select(*rs)
  rs.delete r
  p answer: v
end

工作池

require 'prime'

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

N = 1000
RN = 10
workers = (1..RN).map do
  Ractor.new pipe do |pipe|
    while n = pipe.take
      Ractor.yield [n, n.prime?]
    end
  end
end

(1..N).each{|i|
  pipe << i
}

pp (1..N).map{
  _r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}

管道

# pipeline with yield/take
r1 = Ractor.new do
  'r1'
end

r2 = Ractor.new r1 do |r1|
  r1.take + 'r2'
end

r3 = Ractor.new r2 do |r2|
  r2.take + 'r3'
end

p r3.take #=> 'r1r2r3'
# pipeline with send/receive

r3 = Ractor.new Ractor.current do |cr|
  cr.send Ractor.receive + 'r3'
end

r2 = Ractor.new r3 do |r3|
  r3.send Ractor.receive + 'r2'
end

r1 = Ractor.new r2 do |r2|
  r2.send Ractor.receive + 'r1'
end

r1 << 'r0'
p Ractor.receive #=> "r0r1r2r3"

监控

# ring example again

r = Ractor.current
(1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    r.send Ractor.receive + "r#{i}"
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
# ring example with an error

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current) #=> [:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
r.send "e0"
p Ractor.select(*rs, Ractor.current)
#=>
# <Thread:0x000056262de28bd8 run> terminated with exception (report_on_exception is true):
# Traceback (most recent call last):
#         2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>'
#         1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
# /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
# Traceback (most recent call last):
#         2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>'
#         1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
# /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
#         1: from /home/ko1/src/ruby/trunk/test.rb:21:in `<main>'
# <internal:ractor>:69:in `select': thrown by remote Ractor. (Ractor::RemoteError)
# resend non-error message

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current)
[:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
msg = 'e0'
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  msg = 'r0'
  retry
end

#=> <internal:ractor>:100:in `send': The incoming-port is already closed (Ractor::ClosedError)
# because r == r[-1] is terminated.
# ring example with supervisor and re-start

def make_ractor r, i
  Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
end

r = Ractor.current
rs = (1..10).map{|i|
  r = make_ractor(r, i)
}

msg = 'e0' # error causing message
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  r = rs[-1] = make_ractor(rs[-2], rs.size-1)
  msg = 'x0'
  retry
end

#=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]