class Ractor

Ractor 是 Ruby 的 Actor 模型抽象,提供线程安全的并行执行。

Ractor.new 创建一个新的 Ractor,它可以并行运行。

# The simplest ractor
r = Ractor.new {puts "I am in Ractor!"}
r.take # wait for it to finish
# Here, "I am in Ractor!" is printed

Ractor 之间不共享所有对象。这有两个主要好处:在 Ractor 之间,不可能出现诸如数据竞争和竞态条件之类的线程安全问题。另一个好处是并行性。

为了实现这一点,跨 Ractor 的对象共享是有限的。例如,与线程不同,Ractor 不能访问其他 Ractor 中所有可用的对象。即使通常通过外部作用域中的变量可用的对象也被禁止在 Ractor 之间使用。

a = 1
r = Ractor.new {puts "I am in Ractor! a=#{a}"}
# fails immediately with
# ArgumentError (can not isolate a Proc because it accesses outer variables (a).)

对象必须显式共享

a = 1
r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}

在 CRuby(默认实现)上,全局虚拟机锁 (GVL) 是按 Ractor 持有的,因此 Ractor 可以并行执行而不会相互锁定。这与 CRuby 上的线程情况不同。

应该通过发送和接收消息来在 Ractor 之间传递对象,而不是访问共享状态。

a = 1
r = Ractor.new do
  a_in_ractor = receive # receive blocks until somebody passes a message
  puts "I am in Ractor! a=#{a_in_ractor}"
end
r.send(a)  # pass it
r.take
# Here, "I am in Ractor! a=1" is printed

有两对用于发送/接收消息的方法

除此之外,传递给 Ractor.new 的任何参数都会传递给代码块,并且可以在那里像通过 Ractor.receive 接收一样使用,并且最后一个代码块的值会像通过 Ractor.yield 发送一样发送到 Ractor 外部。

一个经典的乒乓球示例

server = Ractor.new(name: "server") do
  puts "Server starts: #{self.inspect}"
  puts "Server sends: ping"
  Ractor.yield 'ping'                       # The server doesn't know the receiver and sends to whoever interested
  received = Ractor.receive                 # The server doesn't know the sender and receives from whoever sent
  puts "Server received: #{received}"
end

client = Ractor.new(server) do |srv|        # The server is sent to the client, and available as srv
  puts "Client starts: #{self.inspect}"
  received = srv.take                       # The client takes a message from the server
  puts "Client received from " \
       "#{srv.inspect}: #{received}"
  puts "Client sends to " \
       "#{srv.inspect}: pong"
  srv.send 'pong'                           # The client sends a message to the server
end

[client, server].each(&:take)               # Wait until they both finish

这将输出类似以下内容

Server starts: #<Ractor:#2 server test.rb:1 running>
Server sends: ping
Client starts: #<Ractor:#3 test.rb:8 running>
Client received from #<Ractor:#2 server test.rb:1 blocking>: ping
Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong
Server received: pong

Ractor 通过传入端口接收消息,并通过传出端口发送消息。可以使用 Ractor#close_incomingRactor#close_outgoing 分别禁用其中一个端口。当 Ractor 终止时,其端口会自动关闭。

可共享和不可共享的对象

当一个对象在 Ractor 之间发送和接收时,了解该对象是否可共享非常重要。大多数 Ruby 对象都是不可共享的对象。即使是冻结的对象,如果它们包含(通过其实例变量)未冻结的对象,也可能是不可共享的。

可共享对象是可以被多个线程使用而不会损害线程安全的对象,例如数字、truefalseRactor.shareable? 允许你检查这一点,Ractor.make_shareable 尝试使对象可共享(如果它还不是),如果无法做到,则会报错。

Ractor.shareable?(1)            #=> true -- numbers and other immutable basic values are shareable
Ractor.shareable?('foo')        #=> false, unless the string is frozen due to # frozen_string_literal: true
Ractor.shareable?('foo'.freeze) #=> true
Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen

ary = ['hello', 'world']
ary.frozen?                 #=> false
ary[0].frozen?              #=> false
Ractor.make_shareable(ary)
ary.frozen?                 #=> true
ary[0].frozen?              #=> true
ary[1].frozen?              #=> true

当发送可共享对象(通过 sendRactor.yield)时,不会对其进行额外的处理。它只是变得可以被两个 Ractor 使用。当发送不可共享对象时,它可以被复制移动。第一个是默认的,它通过深度克隆(Object#clone)其结构中不可共享的部分来完全复制对象。

data = ['foo', 'bar'.freeze]
r = Ractor.new do
  data2 = Ractor.receive
  puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}"
end
r.send(data)
r.take
puts "Outside  : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"

这将输出类似以下内容

In ractor: 340, 360, 320
Outside  : 380, 400, 320

请注意,数组和数组内部的未冻结字符串的对象 ID 在 Ractor 中已更改,因为它们是不同的对象。第二个数组的元素(一个可共享的冻结字符串)是同一个对象。

深度克隆对象可能很慢,有时甚至不可能。或者,可以在发送期间使用 move: true。这将把不可共享的对象移动到接收 Ractor,使其无法被发送 Ractor 访问。

data = ['foo', 'bar']
r = Ractor.new do
  data_in_ractor = Ractor.receive
  puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}"
end
r.send(data, move: true)
r.take
puts "Outside: moved? #{Ractor::MovedObject === data}"
puts "Outside: #{data.inspect}"

这将输出

In ractor: 100, 120
Outside: moved? true
test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)

请注意,即使是 inspect(以及更基本的方法,如 __id__)在移动的对象上也是无法访问的。

ClassModule 对象是可共享的,因此类/模块定义在 Ractor 之间共享。Ractor 对象也是可共享的。所有对可共享对象的操作都是线程安全的,因此将保持线程安全属性。我们不能在 Ruby 中定义可变的可共享对象,但是 C 扩展可以引入它们。

如果变量的值不可共享,则禁止访问(获取)其他 Ractor 中可共享对象的实例变量。这可能会发生,因为模块/类是可共享的,但它们可以具有值不可共享的实例变量。在非主 Ractor 中,也禁止在类/模块上设置实例变量(即使值是可共享的)。

class C
  class << self
    attr_accessor :tricky
  end
end

C.tricky = "unshareable".dup

r = Ractor.new(C) do |cls|
  puts "I see #{cls}"
  puts "I can't see #{cls.tricky}"
  cls.tricky = true # doesn't get here, but this would also raise an error
end
r.take
# I see C
# can not access instance variables of classes/modules from non-main Ractors (RuntimeError)

如果常量是可共享的,则 Ractor 可以访问它们。主 Ractor 是唯一可以访问不可共享常量的 Ractor。

GOOD = 'good'.freeze
BAD = 'bad'.dup

r = Ractor.new do
  puts "GOOD=#{GOOD}"
  puts "BAD=#{BAD}"
end
r.take
# GOOD=good
# can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError)

# Consider the same C class from above

r = Ractor.new do
  puts "I see #{C}"
  puts "I can't see #{C.tricky}"
end
r.take
# I see C
# can not access instance variables of classes/modules from non-main Ractors (RuntimeError)

另请参阅 注释语法说明中的 # shareable_constant_value 编译指示的描述。

Ractor 与线程

每个 Ractor 都有自己的主 Thread。可以从 Ractor 内部创建新线程(在 CRuby 上,它们与此 Ractor 的其他线程共享 GVL)。

r = Ractor.new do
  a = 1
  Thread.new {puts "Thread in ractor: a=#{a}"}.join
end
r.take
# Here "Thread in ractor: a=1" will be printed

关于代码示例的说明

在下面的示例中,有时我们会使用以下方法来等待当前未阻塞的 Ractor 完成(或取得进展)。

def wait
  sleep(0.1)
end

它**仅用于演示目的**,不应在实际代码中使用。大多数时候,take 用于等待 Ractor 完成。

参考

有关更多详细信息,请参见 Ractor 设计文档

公共类方法

[](sym) 点击以切换源代码

从当前 Ractor 的 Ractor 本地存储中获取一个值

# File ruby_3_4_1/ractor.rb, line 851
def self.[](sym)
  Primitive.ractor_local_value(sym)
end
[]=(sym, val) 点击以切换源代码

在当前 Ractor 的 Ractor 本地存储中设置一个值

# File ruby_3_4_1/ractor.rb, line 856
def self.[]=(sym, val)
  Primitive.ractor_local_value_set(sym, val)
end
count() 点击以切换源代码

返回当前正在运行或阻塞(等待)的 Ractor 的数量。

Ractor.count                   #=> 1
r = Ractor.new(name: 'example') { Ractor.yield(1) }
Ractor.count                   #=> 2 (main + example ractor)
r.take                         # wait for Ractor.yield(1)
r.take                         # wait until r will finish
Ractor.count                   #=> 1
# File ruby_3_4_1/ractor.rb, line 302
def self.count
  __builtin_cexpr! %q{
    ULONG2NUM(GET_VM()->ractor.cnt);
  }
end
current() 点击以切换源代码

返回当前正在执行的 Ractor

Ractor.current #=> #<Ractor:#1 running>
# File ruby_3_4_1/ractor.rb, line 288
def self.current
  __builtin_cexpr! %q{
    rb_ractor_self(rb_ec_ractor_ptr(ec));
  }
end
main() 点击以切换源代码

返回主 Ractor

# File ruby_3_4_1/ractor.rb, line 879
def self.main
  __builtin_cexpr! %q{
    rb_ractor_self(GET_VM()->ractor.main_ractor);
  }
end
main?() 点击以切换源代码

如果当前 Ractor 是主 Ractor,则返回 true

# File ruby_3_4_1/ractor.rb, line 886
def self.main?
  __builtin_cexpr! %q{
    RBOOL(GET_VM()->ractor.main_ractor == rb_ec_ractor_ptr(ec))
  }
end
make_shareable(obj, copy: false) → shareable_obj 点击以切换源代码

使 obj 在 Ractor 之间可共享。

obj 及其引用的所有对象都将被冻结,除非它们已经是可共享的。

如果 copy 关键字为 true,它将在冻结对象之前复制它们,并且不会修改 obj 或其内部对象。

请注意,此方法的规范和实现尚未成熟,将来可能会更改。

obj = ['test']
Ractor.shareable?(obj)     #=> false
Ractor.make_shareable(obj) #=> ["test"]
Ractor.shareable?(obj)     #=> true
obj.frozen?                #=> true
obj[0].frozen?             #=> true

# Copy vs non-copy versions:
obj1 = ['test']
obj1s = Ractor.make_shareable(obj1)
obj1.frozen?                        #=> true
obj1s.object_id == obj1.object_id   #=> true
obj2 = ['test']
obj2s = Ractor.make_shareable(obj2, copy: true)
obj2.frozen?                        #=> false
obj2s.frozen?                       #=> true
obj2s.object_id == obj2.object_id   #=> false
obj2s[0].object_id == obj2[0].object_id #=> false

另请参阅 Ractor 类文档中的“可共享和不可共享的对象”部分。

# File ruby_3_4_1/ractor.rb, line 826
def self.make_shareable obj, copy: false
  if copy
    __builtin_cexpr! %q{
      rb_ractor_make_shareable_copy(obj);
    }
  else
    __builtin_cexpr! %q{
      rb_ractor_make_shareable(obj);
    }
  end
end
new(*args, name: nil) {|*args| block } → ractor 点击以切换源代码

使用 args 和一个代码块创建一个新的 Ractor。

给定的代码块 (Proc) 将被隔离(无法访问任何外部变量)。代码块内的 self 将引用当前 Ractor。

r = Ractor.new { puts "Hi, I am #{self.inspect}" }
r.take
# Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"

传递的任何 args 都会按照通过 send/Ractor.receive 发送对象的相同规则传播到代码块参数。如果 args 中的参数不可共享,它将被复制(通过深度克隆,这可能效率低下)。

arg = [1, 2, 3]
puts "Passing: #{arg} (##{arg.object_id})"
r = Ractor.new(arg) {|received_arg|
  puts "Received: #{received_arg} (##{received_arg.object_id})"
}
r.take
# Prints:
#   Passing: [1, 2, 3] (#280)
#   Received: [1, 2, 3] (#300)

可以出于调试目的设置 Ractor 的 name

r = Ractor.new(name: 'my ractor') {}; r.take
p r
#=> #<Ractor:#3 my ractor test.rb:1 terminated>
# File ruby_3_4_1/ractor.rb, line 273
def self.new(*args, name: nil, &block)
  b = block # TODO: builtin bug
  raise ArgumentError, "must be called with a block" unless block
  if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)")
    warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \
         "Also there are many implementation issues.", uplevel: 0, category: :experimental)
  end
  loc = caller_locations(1, 1).first
  loc = "#{loc.path}:#{loc.lineno}"
  __builtin_ractor_create(loc, name, args, b)
end
receive → msg 点击以切换源代码

从当前 Ractor 的传入端口接收一条消息(该消息由另一个 Ractor 的 send 发送到那里)。

r = Ractor.new do
  v1 = Ractor.receive
  puts "Received: #{v1}"
end
r.send('message1')
r.take
# Here will be printed: "Received: message1"

或者,可以使用私有实例方法 receive

r = Ractor.new do
  v1 = receive
  puts "Received: #{v1}"
end
r.send('message1')
r.take
# This prints: "Received: message1"

如果队列为空,则该方法将阻塞。

r = Ractor.new do
  puts "Before first receive"
  v1 = Ractor.receive
  puts "Received: #{v1}"
  v2 = Ractor.receive
  puts "Received: #{v2}"
end
wait
puts "Still not received"
r.send('message1')
wait
puts "Still received only one"
r.send('message2')
r.take

输出

Before first receive
Still not received
Received: message1
Still received only one
Received: message2

如果在 Ractor 上调用了 close_incoming,则如果传入队列中没有更多消息,该方法将引发 Ractor::ClosedError

Ractor.new do
  close_incoming
  receive
end
wait
# in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
# File ruby_3_4_1/ractor.rb, line 430
def self.receive
  __builtin_cexpr! %q{
    ractor_receive(ec, rb_ec_ractor_ptr(ec))
  }
end
也别名为:recv
receive_if {|msg| block } → msg 点击以切换源代码

仅接收特定的消息。

可以使用 Ractor.receive_if 在代码块中给定一个模式(或任何过滤器),而不是 Ractor.receive,并且可以选择接受 Ractor 的传入队列中可用的消息。

r = Ractor.new do
  p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3"
  p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1"
  p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2"
end
r << "bar1"
r << "baz2"
r << "foo3"
r.take

这将输出

foo3
bar1
baz2

如果代码块返回真值,则该消息将从传入队列中删除并返回。否则,该消息将保留在传入队列中,并且给定的代码块将检查下一条消息。

如果传入队列中没有剩余的消息,该方法将阻塞,直到有新消息到达。

如果代码块通过 break/return/exception/throw 退出,则该消息将从传入队列中删除,就像返回了真值一样。

r = Ractor.new do
  val = Ractor.receive_if{|msg| msg.is_a?(Array)}
  puts "Received successfully: #{val}"
end

r.send(1)
r.send('test')
wait
puts "2 non-matching sent, nothing received"
r.send([1, 2, 3])
wait

打印

2 non-matching sent, nothing received
Received successfully: [1, 2, 3]

请注意,您不能在给定的代码块中递归调用 receive/receive_if。您不应在代码块中执行除消息过滤之外的任何任务。

Ractor.current << true
Ractor.receive_if{|msg| Ractor.receive}
#=> `receive': can not call receive/receive_if recursively (Ractor::Error)
# File ruby_3_4_1/ractor.rb, line 509
def self.receive_if &b
  Primitive.ractor_receive_if b
end
recv()
别名为:receive
select(*ractors, [yield_value:, move: false]) → [ractor or symbol, obj] 点击以切换源代码

等待任何 Ractor 的传出端口中有内容,从该 Ractor 读取,然后返回该 Ractor 和接收到的对象。

r1 = Ractor.new {Ractor.yield 'from 1'}
r2 = Ractor.new {Ractor.yield 'from 2'}

r, obj = Ractor.select(r1, r2)

puts "received #{obj.inspect} from #{r.inspect}"
# Prints: received "from 1" from #<Ractor:#2 test.rb:1 running>
# But could just as well print "from r2" here, either prints could be first.

如果给定的 Ractor 之一是当前 Ractor,并且被选中,则 r 将包含 :receive 符号而不是 Ractor 对象。

r1 = Ractor.new(Ractor.current) do |main|
  main.send 'to main'
  Ractor.yield 'from 1'
end
r2 = Ractor.new do
  Ractor.yield 'from 2'
end

r, obj = Ractor.select(r1, r2, Ractor.current)
puts "received #{obj.inspect} from #{r.inspect}"
# Could print: received "to main" from :receive

如果提供了 yield_value,则当另一个 Ractor 调用 take 时,可以产生该值。在这种情况下,将返回对 [:yield, nil]

r1 = Ractor.new(Ractor.current) do |main|
  puts "Received from main: #{main.take}"
end

puts "Trying to select"
r, obj = Ractor.select(r1, Ractor.current, yield_value: 123)
wait
puts "Received #{obj.inspect} from #{r.inspect}"

这将打印

Trying to select
Received from main: 123
Received nil from :yield

move 布尔标志定义产生的值将被复制(默认)还是移动。

# File ruby_3_4_1/ractor.rb, line 358
def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
  raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty?

  if ractors.delete Ractor.current
    do_receive = true
  else
    do_receive = false
  end

  __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move
end
shareable?(obj) → true | false 点击以切换源代码

检查该对象是否可以被 Ractor 共享。

Ractor.shareable?(1)            #=> true -- numbers and other immutable basic values are frozen
Ractor.shareable?('foo')        #=> false, unless the string is frozen due to # frozen_string_literal: true
Ractor.shareable?('foo'.freeze) #=> true

另请参阅 Ractor 类文档中的“可共享和不可共享的对象”部分。

# File ruby_3_4_1/ractor.rb, line 785
def self.shareable? obj
  __builtin_cexpr! %q{
    RBOOL(rb_ractor_shareable_p(obj));
  }
end
store_if_absent(key){ init_block } 点击以切换源代码

如果未设置对应的值,则使用 init_block 生成一个值,并以线程安全的方式存储该值。此方法返回相应的存储值。

(1..10).map{
  Thread.new(it){|i|
    Ractor.store_if_absent(:s){ f(); i }
    #=> return stored value of key :s
  }
}.map(&:value).uniq.size #=> 1 and f() is called only once
# File ruby_3_4_1/ractor.rb, line 874
def self.store_if_absent(sym)
  Primitive.ractor_local_value_store_if_absent(sym)
end
yield(msg, move: false) → nil 点击以切换源代码

向当前 Ractor 的传出端口发送消息,以便被 take 接收。

r = Ractor.new {Ractor.yield 'Hello from ractor'}
puts r.take
# Prints: "Hello from ractor"

此方法是阻塞的,只有在有人消费了发送的消息后才会返回。

r = Ractor.new do
  Ractor.yield 'Hello from ractor'
  puts "Ractor: after yield"
end
wait
puts "Still not taken"
puts r.take

这将打印

Still not taken
Hello from ractor
Ractor: after yield

如果传出端口已使用 close_outgoing 关闭,则该方法将引发异常。

r = Ractor.new do
  close_outgoing
  Ractor.yield 'Hello from ractor'
end
wait
# `yield': The outgoing-port is already closed (Ractor::ClosedError)

move 参数的含义与 send 相同。

# File ruby_3_4_1/ractor.rb, line 643
def self.yield(obj, move: false)
  __builtin_cexpr! %q{
    ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move)
  }
end

公共实例方法

<<(obj, move: false)
别名:send
[](sym) 点击以切换源代码

从当前 Ractor 的 Ractor 本地存储中获取一个值。已过时,请使用 Ractor.[] 代替。

# File ruby_3_4_1/ractor.rb, line 840
def [](sym)
  Primitive.ractor_local_value(sym)
end
[]=(sym, val) 点击以切换源代码

在当前 Ractor 的 Ractor 本地存储中设置一个值。已过时,请使用 Ractor.[]= 代替。

# File ruby_3_4_1/ractor.rb, line 846
def []=(sym, val)
  Primitive.ractor_local_value_set(sym, val)
end
close_incoming → true | false 点击以切换源代码

关闭传入端口,并返回它是否已被关闭。所有后续在 Ractor 中调用 Ractor.receive 和向该 Ractor 发送 send 的尝试都会失败,并抛出 Ractor::ClosedError 异常。

r = Ractor.new {sleep(500)}
r.close_incoming  #=> false
r.close_incoming  #=> true
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)
# File ruby_3_4_1/ractor.rb, line 750
def close_incoming
  __builtin_cexpr! %q{
    ractor_close_incoming(ec, RACTOR_PTR(self));
  }
end
close_outgoing → true | false 点击以切换源代码

关闭传出端口,并返回它是否已被关闭。所有后续在 Ractor 中调用 Ractor.yield 和从该 Ractor 接收 take 的尝试都会失败,并抛出 Ractor::ClosedError 异常。

r = Ractor.new {sleep(500)}
r.close_outgoing  #=> false
r.close_outgoing  #=> true
r.take
# Ractor::ClosedError (The outgoing-port is already closed)
# File ruby_3_4_1/ractor.rb, line 768
def close_outgoing
  __builtin_cexpr! %q{
    ractor_close_outgoing(ec, RACTOR_PTR(self));
  }
end
inspect() 点击以切换源代码
# File ruby_3_4_1/ractor.rb, line 716
def inspect
  loc  = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc }
  name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
  id   = __builtin_cexpr! %q{ UINT2NUM(rb_ractor_id(RACTOR_PTR(self))) }
  status = __builtin_cexpr! %q{
    rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_))
  }
  "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>"
end
也是别名:to_s
name() 点击以切换源代码

Ractor.new 中设置的名称,或 nil

# File ruby_3_4_1/ractor.rb, line 729
def name
  __builtin_cexpr! %q{RACTOR_PTR(self)->name}
end
recv()
别名:receive
send(msg, move: false) → self 点击以切换源代码

向 Ractor 的传入队列发送消息,以便被 Ractor.receive 接收。

r = Ractor.new do
  value = Ractor.receive
  puts "Received #{value}"
end
r.send 'message'
# Prints: "Received: message"

该方法是非阻塞的(即使 Ractor 没有准备好接收任何内容也会立即返回)

r = Ractor.new {sleep(5)}
r.send('test')
puts "Sent successfully"
# Prints: "Sent successfully" immediately

尝试向已完成执行的 Ractor 发送消息会引发 Ractor::ClosedError 异常。

r = Ractor.new {}
r.take
p r
# "#<Ractor:#6 (irb):23 terminated>"
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)

如果在 Ractor 上调用了 close_incoming,该方法也会引发 Ractor::ClosedError 异常。

r =  Ractor.new do
  sleep(500)
  receive
end
r.close_incoming
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)
# The error is raised immediately, not when the ractor tries to receive

如果 obj 是不可共享的,默认情况下,它将通过深度克隆复制到接收 Ractor 中。如果传递了 move: true,则该对象将 *移动* 到接收 Ractor 中,并且发送者无法再访问它。

r = Ractor.new {puts "Received: #{receive}"}
msg = 'message'
r.send(msg, move: true)
r.take
p msg

这将打印

Received: message
in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>

对该对象及其部分的引用将对发送者失效。

r = Ractor.new {puts "Received: #{receive}"}
s = 'message'
ary = [s]
copy = ary.dup
r.send(ary, move: true)

s.inspect
# Ractor::MovedError (can not send any methods to a moved object)
ary.class
# Ractor::MovedError (can not send any methods to a moved object)
copy.class
# => Array, it is different object
copy[0].inspect
# Ractor::MovedError (can not send any methods to a moved object)
# ...but its item was still a reference to `s`, which was moved

如果该对象是可共享的,则 move: true 对它没有影响。

r = Ractor.new {puts "Received: #{receive}"}
s = 'message'.freeze
r.send(s, move: true)
s.inspect #=> "message", still available
# File ruby_3_4_1/ractor.rb, line 599
def send(obj, move: false)
  __builtin_cexpr! %q{
    ractor_send(ec, RACTOR_PTR(self), obj, move)
  }
end
也是别名:<<
take → msg 点击以切换源代码

从 Ractor 的传出端口获取消息,该消息由 Ractor.yield 或在 Ractor 终止时放置在那里。

r = Ractor.new do
  Ractor.yield 'explicit yield'
  'last value'
end
puts r.take #=> 'explicit yield'
puts r.take #=> 'last value'
puts r.take # Ractor::ClosedError (The outgoing-port is already closed)

最后的值也被发送到传出端口的事实意味着 take 可以用作 Thread#join 的类似物(“只需等待直到 Ractor 完成”)。但是,如果有人已经消费了该消息,则会引发异常。

如果传出端口已使用 close_outgoing 关闭,则该方法将引发 Ractor::ClosedError 异常。

r = Ractor.new do
  sleep(500)
  Ractor.yield 'Hello from ractor'
end
r.close_outgoing
r.take
# Ractor::ClosedError (The outgoing-port is already closed)
# The error would be raised immediately, not when ractor will try to receive

如果在 Ractor 中引发了未捕获的异常,它将通过 take 作为 Ractor::RemoteError 传播。

r = Ractor.new {raise "Something weird happened"}

begin
  r.take
rescue => e
  p e              #  => #<Ractor::RemoteError: thrown by remote Ractor.>
  p e.ractor == r  # => true
  p e.cause        # => #<RuntimeError: Something weird happened>
end

Ractor::ClosedErrorStopIteration 的后代,因此 Ractor 的终止将打破任何接收此消息的循环,而不会传播错误。

r = Ractor.new do
  3.times {|i| Ractor.yield "message #{i}"}
  "finishing"
end

loop {puts "Received: " + r.take}
puts "Continue successfully"

这将打印

Received: message 0
Received: message 1
Received: message 2
Received: finishing
Continue successfully
# File ruby_3_4_1/ractor.rb, line 710
def take
  __builtin_cexpr! %q{
    ractor_take(ec, RACTOR_PTR(self))
  }
end
to_s()
别名:inspect

私有实例方法

receive() 点击以切换源代码

Ractor.receive 相同

# File ruby_3_4_1/ractor.rb, line 441
        def receive
  __builtin_cexpr! %q{
    ractor_receive(ec, rb_ec_ractor_ptr(ec))
  }
end
也是别名:recv
receive_if(&b) 点击以切换源代码

Ractor.receive_if 相同

# File ruby_3_4_1/ractor.rb, line 514
        def receive_if &b
  Primitive.ractor_receive_if b
end