类 Ractor

Ractor 是 Ruby 的一种 Actor 模型抽象,它提供线程安全的并行执行。

Ractor.new 创建一个新的 Ractor,它可以并行运行。

# The simplest ractor
r = Ractor.new {puts "I am in Ractor!"}
r.take # wait for it to finish
# Here, "I am in Ractor!" is printed

Ractor 不会彼此共享所有对象。这样做有两个主要好处:在 Ractor 之间,数据竞争和竞争条件等线程安全问题是不可能的。另一个好处是并行性。

为了实现这一点,Ractor 之间的对象共享是有限的。例如,与线程不同,Ractor 无法访问其他 Ractor 中可用的所有对象。即使通常通过外部作用域中的变量可用的对象也禁止在 Ractor 之间使用。

a = 1
r = Ractor.new {puts "I am in Ractor! a=#{a}"}
# fails immediately with
# ArgumentError (can not isolate a Proc because it accesses outer variables (a).)

对象必须显式共享

a = 1
r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}

在 CRuby(默认实现)上,每个 Ractor 都持有全局虚拟机锁 (GVL),因此 Ractor 可以并行执行而不会相互锁定。这与 CRuby 上线程的情况不同。

与其访问共享状态,不如通过将对象作为消息发送和接收来将对象传递到 Ractor 和从 Ractor 传递出去。

a = 1
r = Ractor.new do
  a_in_ractor = receive # receive blocks until somebody passes a message
  puts "I am in Ractor! a=#{a_in_ractor}"
end
r.send(a)  # pass it
r.take
# Here, "I am in Ractor! a=1" is printed

有两种用于发送/接收消息的方法对

除此之外,传递给 Ractor.new 的任何参数都会传递给块,并在那里可用,就像通过 Ractor.receive 接收一样,最后一个块值会被发送到 Ractor 外部,就像通过 Ractor.yield 发送一样。

一个经典的乒乓球演示

server = Ractor.new(name: "server") do
  puts "Server starts: #{self.inspect}"
  puts "Server sends: ping"
  Ractor.yield 'ping'                       # The server doesn't know the receiver and sends to whoever interested
  received = Ractor.receive                 # The server doesn't know the sender and receives from whoever sent
  puts "Server received: #{received}"
end

client = Ractor.new(server) do |srv|        # The server is sent to the client, and available as srv
  puts "Client starts: #{self.inspect}"
  received = srv.take                       # The client takes a message from the server
  puts "Client received from " \
       "#{srv.inspect}: #{received}"
  puts "Client sends to " \
       "#{srv.inspect}: pong"
  srv.send 'pong'                           # The client sends a message to the server
end

[client, server].each(&:take)               # Wait until they both finish

这将输出类似的内容

Server starts: #<Ractor:#2 server test.rb:1 running>
Server sends: ping
Client starts: #<Ractor:#3 test.rb:8 running>
Client received from #<Ractor:#2 server test.rb:1 blocking>: ping
Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong
Server received: pong

Ractor 通过输入端口接收消息,并通过输出端口发送消息。可以使用 Ractor#close_incomingRactor#close_outgoing 分别禁用其中一个。当 Ractor 终止时,其端口会自动关闭。

可共享和不可共享对象

当对象被发送到 Ractor 并从 Ractor 发送回来时,了解对象是可共享还是不可共享非常重要。大多数 Ruby 对象都是不可共享对象。即使是冻结对象,如果它们包含(通过其实例变量)未冻结对象,也可能是不可共享的。

可共享对象是指那些可以在多个线程中使用而不会影响线程安全的对象,例如数字、truefalseRactor.shareable? 允许您检查这一点,而 Ractor.make_shareable 尝试使对象可共享(如果它还没有),如果无法做到,则会报错。

Ractor.shareable?(1)            #=> true -- numbers and other immutable basic values are shareable
Ractor.shareable?('foo')        #=> false, unless the string is frozen due to # frozen_string_literal: true
Ractor.shareable?('foo'.freeze) #=> true
Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen

ary = ['hello', 'world']
ary.frozen?                 #=> false
ary[0].frozen?              #=> false
Ractor.make_shareable(ary)
ary.frozen?                 #=> true
ary[0].frozen?              #=> true
ary[1].frozen?              #=> true

当可共享对象被发送(通过 sendRactor.yield)时,不会对其进行任何额外的处理。它只是变得可以被两个 Ractor 使用。当不可共享对象被发送时,它可以被复制移动。前者是默认行为,它通过深度克隆(Object#clone)其结构的不可共享部分来完全复制对象。

data = ['foo', 'bar'.freeze]
r = Ractor.new do
  data2 = Ractor.receive
  puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}"
end
r.send(data)
r.take
puts "Outside  : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"

这将输出类似的内容

In ractor: 340, 360, 320
Outside  : 380, 400, 320

请注意,数组和数组内部的非冻结字符串的 object id 在 Ractor 中发生了变化,因为它们是不同的对象。第二个数组的元素,即可共享的冻结字符串,是同一个对象。

对象的深度克隆可能很慢,有时甚至不可能。或者,在发送时可以使用 move: true。这将移动不可共享对象到接收 Ractor,使其无法被发送 Ractor 访问。

data = ['foo', 'bar']
r = Ractor.new do
  data_in_ractor = Ractor.receive
  puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}"
end
r.send(data, move: true)
r.take
puts "Outside: moved? #{Ractor::MovedObject === data}"
puts "Outside: #{data.inspect}"

这将输出

In ractor: 100, 120
Outside: moved? true
test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)

请注意,即使 inspect(以及更基本的方法,如 __id__)也无法在移动的对象上访问。

ClassModule 对象是可共享的,因此类/模块定义在 Ractor 之间共享。Ractor 对象也是可共享的。对可共享对象的所有操作都是线程安全的,因此线程安全属性将被保留。我们无法在 Ruby 中定义可变的可共享对象,但 C 扩展可以引入它们。

如果共享对象的实例变量的值不可共享,则禁止在其他 Ractor 中访问(获取)这些实例变量。这种情况可能发生是因为模块/类是可共享的,但它们可能具有不可共享值的实例变量。在非主 Ractor 中,还禁止设置类/模块的实例变量(即使值是可共享的)。

class C
  class << self
    attr_accessor :tricky
  end
end

C.tricky = "unshareable".dup

r = Ractor.new(C) do |cls|
  puts "I see #{cls}"
  puts "I can't see #{cls.tricky}"
  cls.tricky = true # doesn't get here, but this would also raise an error
end
r.take
# I see C
# can not access instance variables of classes/modules from non-main Ractors (RuntimeError)

如果常量是可共享的,则 Ractor 可以访问它们。主 Ractor 是唯一可以访问不可共享常量的 Ractor。

GOOD = 'good'.freeze
BAD = 'bad'.dup

r = Ractor.new do
  puts "GOOD=#{GOOD}"
  puts "BAD=#{BAD}"
end
r.take
# GOOD=good
# can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError)

# Consider the same C class from above

r = Ractor.new do
  puts "I see #{C}"
  puts "I can't see #{C.tricky}"
end
r.take
# I see C
# can not access instance variables of classes/modules from non-main Ractors (RuntimeError)

另请参阅 注释语法 说明中关于 # shareable_constant_value 编译指示的描述。

Ractor 与线程

每个 Ractor 都有自己的主 Thread。可以在 Ractor 内部创建新线程(并且在 CRuby 上,它们与该 Ractor 的其他线程共享 GVL)。

r = Ractor.new do
  a = 1
  Thread.new {puts "Thread in ractor: a=#{a}"}.join
end
r.take
# Here "Thread in ractor: a=1" will be printed

关于代码示例的说明

在下面的示例中,我们有时使用以下方法来等待当前未阻塞的 Ractor 完成(或取得进展)。

def wait
  sleep(0.1)
end

这 **仅用于演示目的**,不应在实际代码中使用。大多数情况下,使用 take 来等待 Ractor 完成。

参考

有关更多详细信息,请参阅 Ractor 设计文档

公共类方法

count() 点击以切换源代码

返回当前正在运行或阻塞(等待)的 Ractor 数量。

Ractor.count                   #=> 1
r = Ractor.new(name: 'example') { Ractor.yield(1) }
Ractor.count                   #=> 2 (main + example ractor)
r.take                         # wait for Ractor.yield(1)
r.take                         # wait until r will finish
Ractor.count                   #=> 1
# File ruby_3_3_0/ractor.rb, line 302
def self.count
  __builtin_cexpr! %q{
    ULONG2NUM(GET_VM()->ractor.cnt);
  }
end
current() 点击以切换源代码

返回当前正在执行的 Ractor

Ractor.current #=> #<Ractor:#1 running>
# File ruby_3_3_0/ractor.rb, line 288
def self.current
  __builtin_cexpr! %q{
    rb_ractor_self(rb_ec_ractor_ptr(ec));
  }
end
main() 点击以切换源代码

返回主 Ractor

# File ruby_3_3_0/ractor.rb, line 848
def self.main
  __builtin_cexpr! %q{
    rb_ractor_self(GET_VM()->ractor.main_ractor);
  }
end
make_shareable(obj, copy: false) → shareable_obj 点击以切换源代码

使 obj 在 Ractor 之间可共享。

obj 及其引用的所有对象都将被冻结,除非它们已经是可共享的。

如果 copy 关键字为 true,它将在冻结对象之前复制它们,并且不会修改 obj 或其内部对象。

请注意,此方法的规范和实现尚未成熟,将来可能会更改。

obj = ['test']
Ractor.shareable?(obj)     #=> false
Ractor.make_shareable(obj) #=> ["test"]
Ractor.shareable?(obj)     #=> true
obj.frozen?                #=> true
obj[0].frozen?             #=> true

# Copy vs non-copy versions:
obj1 = ['test']
obj1s = Ractor.make_shareable(obj1)
obj1.frozen?                        #=> true
obj1s.object_id == obj1.object_id   #=> true
obj2 = ['test']
obj2s = Ractor.make_shareable(obj2, copy: true)
obj2.frozen?                        #=> false
obj2s.frozen?                       #=> true
obj2s.object_id == obj2.object_id   #=> false
obj2s[0].object_id == obj2[0].object_id #=> false

另请参阅 Ractor 类文档中的“可共享和不可共享对象”部分。

# File ruby_3_3_0/ractor.rb, line 825
def self.make_shareable obj, copy: false
  if copy
    __builtin_cexpr! %q{
      rb_ractor_make_shareable_copy(obj);
    }
  else
    __builtin_cexpr! %q{
      rb_ractor_make_shareable(obj);
    }
  end
end
new(*args, name: nil) {|*args| block } → ractor 点击以切换源代码

使用参数和代码块创建一个新的 Ractor。

给定的代码块(Proc)将被隔离(无法访问任何外部变量)。代码块内部的 self 将引用当前的 Ractor。

r = Ractor.new { puts "Hi, I am #{self.inspect}" }
r.take
# Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"

传递的任何 args 都将通过与通过 send/Ractor.receive 发送的对象相同的规则传播到代码块参数。如果 args 中的参数不可共享,它将被复制(通过深度克隆,这可能效率低下)。

arg = [1, 2, 3]
puts "Passing: #{arg} (##{arg.object_id})"
r = Ractor.new(arg) {|received_arg|
  puts "Received: #{received_arg} (##{received_arg.object_id})"
}
r.take
# Prints:
#   Passing: [1, 2, 3] (#280)
#   Received: [1, 2, 3] (#300)

Ractor 的 name 可以为了调试目的而设置。

r = Ractor.new(name: 'my ractor') {}; r.take
p r
#=> #<Ractor:#3 my ractor test.rb:1 terminated>
# File ruby_3_3_0/ractor.rb, line 273
def self.new(*args, name: nil, &block)
  b = block # TODO: builtin bug
  raise ArgumentError, "must be called with a block" unless block
  if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)")
    warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \
         "Also there are many implementation issues.", uplevel: 0, category: :experimental)
  end
  loc = caller_locations(1, 1).first
  loc = "#{loc.path}:#{loc.lineno}"
  __builtin_ractor_create(loc, name, args, b)
end
receive → msg 点击切换源代码

从当前 Ractor 的传入端口接收消息(该消息由另一个 Ractor 的 send 发送到该端口)。

r = Ractor.new do
  v1 = Ractor.receive
  puts "Received: #{v1}"
end
r.send('message1')
r.take
# Here will be printed: "Received: message1"

或者,可以使用私有实例方法 receive

r = Ractor.new do
  v1 = receive
  puts "Received: #{v1}"
end
r.send('message1')
r.take
# This prints: "Received: message1"

如果队列为空,该方法将阻塞。

r = Ractor.new do
  puts "Before first receive"
  v1 = Ractor.receive
  puts "Received: #{v1}"
  v2 = Ractor.receive
  puts "Received: #{v2}"
end
wait
puts "Still not received"
r.send('message1')
wait
puts "Still received only one"
r.send('message2')
r.take

输出

Before first receive
Still not received
Received: message1
Still received only one
Received: message2

如果在 Ractor 上调用了 close_incoming,并且传入队列中没有更多消息,则该方法将引发 Ractor::ClosedError

Ractor.new do
  close_incoming
  receive
end
wait
# in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
# File ruby_3_3_0/ractor.rb, line 430
def self.receive
  __builtin_cexpr! %q{
    ractor_receive(ec, rb_ec_ractor_ptr(ec))
  }
end
也称为:recv
receive_if {|msg| block } → msg 点击切换源代码

仅接收特定消息。

Ractor.receive 不同,Ractor.receive_if 可以接收代码块中的模式(或任何过滤器),您可以选择接受 Ractor 的传入队列中可用的消息。

r = Ractor.new do
  p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3"
  p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1"
  p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2"
end
r << "bar1"
r << "baz2"
r << "foo3"
r.take

这将输出

foo3
bar1
baz2

如果代码块返回真值,则该消息将从传入队列中删除并返回。否则,该消息将保留在传入队列中,并且给定代码块将检查下一条消息。

如果传入队列中没有剩余消息,则该方法将阻塞,直到有新消息到达。

如果代码块通过 break/return/exception/throw 退出,则该消息将从传入队列中删除,就像返回了真值一样。

r = Ractor.new do
  val = Ractor.receive_if{|msg| msg.is_a?(Array)}
  puts "Received successfully: #{val}"
end

r.send(1)
r.send('test')
wait
puts "2 non-matching sent, nothing received"
r.send([1, 2, 3])
wait

打印

2 non-matching sent, nothing received
Received successfully: [1, 2, 3]

请注意,您不能在给定的代码块中递归调用 receive/receive_if。您不应该在代码块中执行除消息过滤之外的任何任务。

Ractor.current << true
Ractor.receive_if{|msg| Ractor.receive}
#=> `receive': can not call receive/receive_if recursively (Ractor::Error)
# File ruby_3_3_0/ractor.rb, line 509
def self.receive_if &b
  Primitive.ractor_receive_if b
end
recv()
别名:receive
select(*ractors, [yield_value:, move: false]) → [ractor or symbol, obj] 点击切换源代码

等待任何 ractor 在其输出端口中拥有数据,从该 ractor 读取数据,然后返回该 ractor 和接收到的对象。

r1 = Ractor.new {Ractor.yield 'from 1'}
r2 = Ractor.new {Ractor.yield 'from 2'}

r, obj = Ractor.select(r1, r2)

puts "received #{obj.inspect} from #{r.inspect}"
# Prints: received "from 1" from #<Ractor:#2 test.rb:1 running>
# But could just as well print "from r2" here, either prints could be first.

如果给定的 ractors 之一是当前 ractor,并且它被选中,则 r 将包含 :receive 符号而不是 ractor 对象。

r1 = Ractor.new(Ractor.current) do |main|
  main.send 'to main'
  Ractor.yield 'from 1'
end
r2 = Ractor.new do
  Ractor.yield 'from 2'
end

r, obj = Ractor.select(r1, r2, Ractor.current)
puts "received #{obj.inspect} from #{r.inspect}"
# Could print: received "to main" from :receive

如果提供了 yield_value,则如果另一个 ractor 正在调用 take,则可以产生该值。在这种情况下,将返回对 [:yield, nil]

r1 = Ractor.new(Ractor.current) do |main|
  puts "Received from main: #{main.take}"
end

puts "Trying to select"
r, obj = Ractor.select(r1, Ractor.current, yield_value: 123)
wait
puts "Received #{obj.inspect} from #{r.inspect}"

这将打印

Trying to select
Received from main: 123
Received nil from :yield

move 布尔标志定义是否复制(默认)或移动产生的值。

# File ruby_3_3_0/ractor.rb, line 358
def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
  raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty?

  if ractors.delete Ractor.current
    do_receive = true
  else
    do_receive = false
  end

  __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move
end
shareable?(obj) → true | false 点击切换源代码

检查对象是否可以被 ractors 共享。

Ractor.shareable?(1)            #=> true -- numbers and other immutable basic values are frozen
Ractor.shareable?('foo')        #=> false, unless the string is frozen due to # frozen_string_literal: true
Ractor.shareable?('foo'.freeze) #=> true

另请参阅 Ractor 类文档中的“可共享和不可共享对象”部分。

# File ruby_3_3_0/ractor.rb, line 784
def self.shareable? obj
  __builtin_cexpr! %q{
    RBOOL(rb_ractor_shareable_p(obj));
  }
end
yield(msg, move: false) → nil 点击切换源代码

向当前 ractor 的输出端口发送一条消息,以便被 take 接受。

r = Ractor.new {Ractor.yield 'Hello from ractor'}
puts r.take
# Prints: "Hello from ractor"

此方法是阻塞的,只有在有人消费发送的消息后才会返回。

r = Ractor.new do
  Ractor.yield 'Hello from ractor'
  puts "Ractor: after yield"
end
wait
puts "Still not taken"
puts r.take

这将打印

Still not taken
Hello from ractor
Ractor: after yield

如果输出端口已使用 close_outgoing 关闭,则该方法将引发

r = Ractor.new do
  close_outgoing
  Ractor.yield 'Hello from ractor'
end
wait
# `yield': The outgoing-port is already closed (Ractor::ClosedError)

move 参数的含义与 send 相同。

# File ruby_3_3_0/ractor.rb, line 643
def self.yield(obj, move: false)
  __builtin_cexpr! %q{
    ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move)
  }
end

公共实例方法

<<(obj, move: false)
别名:send
[](sym) 点击切换源代码

从 ractor 本地存储中获取值

# File ruby_3_3_0/ractor.rb, line 838
def [](sym)
  Primitive.ractor_local_value(sym)
end
[]=(sym, val) 点击切换源代码

在 ractor 本地存储中设置值

# File ruby_3_3_0/ractor.rb, line 843
def []=(sym, val)
  Primitive.ractor_local_value_set(sym, val)
end
close_incoming → true | false 点击切换源代码

关闭传入端口并返回它是否已关闭。所有进一步尝试在 ractor 中 Ractor.receive,以及向 ractor send 将使用 Ractor::ClosedError 失败。

r = Ractor.new {sleep(500)}
r.close_incoming  #=> false
r.close_incoming  #=> true
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)
# File ruby_3_3_0/ractor.rb, line 749
def close_incoming
  __builtin_cexpr! %q{
    ractor_close_incoming(ec, RACTOR_PTR(self));
  }
end
close_outgoing → true | false 点击切换源代码

关闭传出端口并返回它是否已关闭。所有进一步尝试在 ractor 中 Ractor.yield,以及从 ractor take 将使用 Ractor::ClosedError 失败。

r = Ractor.new {sleep(500)}
r.close_outgoing  #=> false
r.close_outgoing  #=> true
r.take
# Ractor::ClosedError (The outgoing-port is already closed)
# File ruby_3_3_0/ractor.rb, line 767
def close_outgoing
  __builtin_cexpr! %q{
    ractor_close_outgoing(ec, RACTOR_PTR(self));
  }
end
inspect() 点击切换源代码
# File ruby_3_3_0/ractor.rb, line 716
def inspect
  loc  = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc }
  name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
  id   = __builtin_cexpr! %q{ UINT2NUM(rb_ractor_id(RACTOR_PTR(self))) }
  status = __builtin_cexpr! %q{
    rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_))
  }
  "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>"
end
也称为:to_s
name() 点击切换源代码

Ractor.new 中设置的名称,或 nil

# File ruby_3_3_0/ractor.rb, line 729
def name
  __builtin_cexpr! %q{RACTOR_PTR(self)->name}
end
recv()
别名:receive
send(msg, move: false) → self 点击切换源代码

将消息发送到 Ractor 的传入队列,以便被 Ractor.receive 接收。

r = Ractor.new do
  value = Ractor.receive
  puts "Received #{value}"
end
r.send 'message'
# Prints: "Received: message"

该方法是非阻塞的(即使 Ractor 尚未准备好接收任何内容,也会立即返回)。

r = Ractor.new {sleep(5)}
r.send('test')
puts "Sent successfully"
# Prints: "Sent successfully" immediately

尝试发送到已完成执行的 Ractor 会引发 Ractor::ClosedError

r = Ractor.new {}
r.take
p r
# "#<Ractor:#6 (irb):23 terminated>"
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)

如果在 Ractor 上调用了 close_incoming,该方法也会引发 Ractor::ClosedError

r =  Ractor.new do
  sleep(500)
  receive
end
r.close_incoming
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)
# The error is raised immediately, not when the ractor tries to receive

如果 obj 不可共享,默认情况下它将通过深度克隆复制到接收 Ractor 中。如果传递了 move: true,则该对象将被移动到接收 Ractor 中,并且发送方将无法访问它。

r = Ractor.new {puts "Received: #{receive}"}
msg = 'message'
r.send(msg, move: true)
r.take
p msg

这将打印

Received: message
in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>

发送方对该对象及其部分的所有引用将变得无效。

r = Ractor.new {puts "Received: #{receive}"}
s = 'message'
ary = [s]
copy = ary.dup
r.send(ary, move: true)

s.inspect
# Ractor::MovedError (can not send any methods to a moved object)
ary.class
# Ractor::MovedError (can not send any methods to a moved object)
copy.class
# => Array, it is different object
copy[0].inspect
# Ractor::MovedError (can not send any methods to a moved object)
# ...but its item was still a reference to `s`, which was moved

如果该对象是可共享的,则 move: true 对它没有影响。

r = Ractor.new {puts "Received: #{receive}"}
s = 'message'.freeze
r.send(s, move: true)
s.inspect #=> "message", still available
# File ruby_3_3_0/ractor.rb, line 599
def send(obj, move: false)
  __builtin_cexpr! %q{
    ractor_send(ec, RACTOR_PTR(self), obj, move)
  }
end
也称为:<<
take → msg 点击切换源代码

从 Ractor 的传出端口获取消息,该消息由 Ractor.yield 或在 Ractor 终止时放入。

r = Ractor.new do
  Ractor.yield 'explicit yield'
  'last value'
end
puts r.take #=> 'explicit yield'
puts r.take #=> 'last value'
puts r.take # Ractor::ClosedError (The outgoing-port is already closed)

最后一个值也被发送到传出端口这一事实意味着 take 可以用作 Thread#join 的类似物(“只需等待 Ractor 完成”)。但是,如果有人已经消费了该消息,它会引发异常。

如果传出端口使用 close_outgoing 关闭,该方法将引发 Ractor::ClosedError

r = Ractor.new do
  sleep(500)
  Ractor.yield 'Hello from ractor'
end
r.close_outgoing
r.take
# Ractor::ClosedError (The outgoing-port is already closed)
# The error would be raised immediately, not when ractor will try to receive

如果在 Ractor 中引发了未捕获的异常,它将通过 take 作为 Ractor::RemoteError 传播。

r = Ractor.new {raise "Something weird happened"}

begin
  r.take
rescue => e
  p e              #  => #<Ractor::RemoteError: thrown by remote Ractor.>
  p e.ractor == r  # => true
  p e.cause        # => #<RuntimeError: Something weird happened>
end

Ractor::ClosedErrorStopIteration 的子类,因此 Ractor 的终止将使任何接收此消息的循环退出,而不会传播错误。

r = Ractor.new do
  3.times {|i| Ractor.yield "message #{i}"}
  "finishing"
end

loop {puts "Received: " + r.take}
puts "Continue successfully"

这将打印

Received: message 0
Received: message 1
Received: message 2
Received: finishing
Continue successfully
# File ruby_3_3_0/ractor.rb, line 710
def take
  __builtin_cexpr! %q{
    ractor_take(ec, RACTOR_PTR(self))
  }
end
to_s()
别名:inspect

私有实例方法

receive() 点击切换源代码

Ractor.receive 相同

# File ruby_3_3_0/ractor.rb, line 441
        def receive
  __builtin_cexpr! %q{
    ractor_receive(ec, rb_ec_ractor_ptr(ec))
  }
end
别名:recv
receive_if(&b) 点击切换源代码

Ractor.receive_if 相同

# File ruby_3_3_0/ractor.rb, line 514
        def receive_if &b
  Primitive.ractor_receive_if b
end