类 Ractor
Ractor 是 Ruby 的一种 Actor 模型抽象,它提供线程安全的并行执行。
Ractor.new
创建一个新的 Ractor,它可以并行运行。
# The simplest ractor r = Ractor.new {puts "I am in Ractor!"} r.take # wait for it to finish # Here, "I am in Ractor!" is printed
Ractor 不会彼此共享所有对象。这样做有两个主要好处:在 Ractor 之间,数据竞争和竞争条件等线程安全问题是不可能的。另一个好处是并行性。
为了实现这一点,Ractor 之间的对象共享是有限的。例如,与线程不同,Ractor 无法访问其他 Ractor 中可用的所有对象。即使通常通过外部作用域中的变量可用的对象也禁止在 Ractor 之间使用。
a = 1 r = Ractor.new {puts "I am in Ractor! a=#{a}"} # fails immediately with # ArgumentError (can not isolate a Proc because it accesses outer variables (a).)
对象必须显式共享
a = 1 r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}
在 CRuby(默认实现)上,每个 Ractor 都持有全局虚拟机锁 (GVL),因此 Ractor 可以并行执行而不会相互锁定。这与 CRuby 上线程的情况不同。
与其访问共享状态,不如通过将对象作为消息发送和接收来将对象传递到 Ractor 和从 Ractor 传递出去。
a = 1 r = Ractor.new do a_in_ractor = receive # receive blocks until somebody passes a message puts "I am in Ractor! a=#{a_in_ractor}" end r.send(a) # pass it r.take # Here, "I am in Ractor! a=1" is printed
有两种用于发送/接收消息的方法对
-
Ractor#send
和Ractor.receive
用于发送者知道接收者(推送)的情况; -
Ractor.yield
和Ractor#take
用于接收者知道发送者(拉取)的情况;
除此之外,传递给 Ractor.new
的任何参数都会传递给块,并在那里可用,就像通过 Ractor.receive
接收一样,最后一个块值会被发送到 Ractor 外部,就像通过 Ractor.yield
发送一样。
一个经典的乒乓球演示
server = Ractor.new(name: "server") do puts "Server starts: #{self.inspect}" puts "Server sends: ping" Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested received = Ractor.receive # The server doesn't know the sender and receives from whoever sent puts "Server received: #{received}" end client = Ractor.new(server) do |srv| # The server is sent to the client, and available as srv puts "Client starts: #{self.inspect}" received = srv.take # The client takes a message from the server puts "Client received from " \ "#{srv.inspect}: #{received}" puts "Client sends to " \ "#{srv.inspect}: pong" srv.send 'pong' # The client sends a message to the server end [client, server].each(&:take) # Wait until they both finish
这将输出类似的内容
Server starts: #<Ractor:#2 server test.rb:1 running> Server sends: ping Client starts: #<Ractor:#3 test.rb:8 running> Client received from #<Ractor:#2 server test.rb:1 blocking>: ping Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong Server received: pong
Ractor 通过输入端口接收消息,并通过输出端口发送消息。可以使用 Ractor#close_incoming
和 Ractor#close_outgoing
分别禁用其中一个。当 Ractor 终止时,其端口会自动关闭。
可共享和不可共享对象¶ ↑
当对象被发送到 Ractor 并从 Ractor 发送回来时,了解对象是可共享还是不可共享非常重要。大多数 Ruby 对象都是不可共享对象。即使是冻结对象,如果它们包含(通过其实例变量)未冻结对象,也可能是不可共享的。
可共享对象是指那些可以在多个线程中使用而不会影响线程安全的对象,例如数字、true
和 false
。 Ractor.shareable?
允许您检查这一点,而 Ractor.make_shareable
尝试使对象可共享(如果它还没有),如果无法做到,则会报错。
Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true Ractor.shareable?('foo'.freeze) #=> true Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen ary = ['hello', 'world'] ary.frozen? #=> false ary[0].frozen? #=> false Ractor.make_shareable(ary) ary.frozen? #=> true ary[0].frozen? #=> true ary[1].frozen? #=> true
当可共享对象被发送(通过 send
或 Ractor.yield
)时,不会对其进行任何额外的处理。它只是变得可以被两个 Ractor 使用。当不可共享对象被发送时,它可以被复制或移动。前者是默认行为,它通过深度克隆(Object#clone
)其结构的不可共享部分来完全复制对象。
data = ['foo', 'bar'.freeze] r = Ractor.new do data2 = Ractor.receive puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}" end r.send(data) r.take puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"
这将输出类似的内容
In ractor: 340, 360, 320 Outside : 380, 400, 320
请注意,数组和数组内部的非冻结字符串的 object id 在 Ractor 中发生了变化,因为它们是不同的对象。第二个数组的元素,即可共享的冻结字符串,是同一个对象。
对象的深度克隆可能很慢,有时甚至不可能。或者,在发送时可以使用 move: true
。这将移动不可共享对象到接收 Ractor,使其无法被发送 Ractor 访问。
data = ['foo', 'bar'] r = Ractor.new do data_in_ractor = Ractor.receive puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}" end r.send(data, move: true) r.take puts "Outside: moved? #{Ractor::MovedObject === data}" puts "Outside: #{data.inspect}"
这将输出
In ractor: 100, 120 Outside: moved? true test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)
请注意,即使 inspect
(以及更基本的方法,如 __id__
)也无法在移动的对象上访问。
Class
和 Module
对象是可共享的,因此类/模块定义在 Ractor 之间共享。Ractor 对象也是可共享的。对可共享对象的所有操作都是线程安全的,因此线程安全属性将被保留。我们无法在 Ruby 中定义可变的可共享对象,但 C 扩展可以引入它们。
如果共享对象的实例变量的值不可共享,则禁止在其他 Ractor 中访问(获取)这些实例变量。这种情况可能发生是因为模块/类是可共享的,但它们可能具有不可共享值的实例变量。在非主 Ractor 中,还禁止设置类/模块的实例变量(即使值是可共享的)。
class C class << self attr_accessor :tricky end end C.tricky = "unshareable".dup r = Ractor.new(C) do |cls| puts "I see #{cls}" puts "I can't see #{cls.tricky}" cls.tricky = true # doesn't get here, but this would also raise an error end r.take # I see C # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
如果常量是可共享的,则 Ractor 可以访问它们。主 Ractor 是唯一可以访问不可共享常量的 Ractor。
GOOD = 'good'.freeze BAD = 'bad'.dup r = Ractor.new do puts "GOOD=#{GOOD}" puts "BAD=#{BAD}" end r.take # GOOD=good # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError) # Consider the same C class from above r = Ractor.new do puts "I see #{C}" puts "I can't see #{C.tricky}" end r.take # I see C # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
另请参阅 注释语法 说明中关于 # shareable_constant_value
编译指示的描述。
Ractor 与线程¶ ↑
每个 Ractor 都有自己的主 Thread
。可以在 Ractor 内部创建新线程(并且在 CRuby 上,它们与该 Ractor 的其他线程共享 GVL)。
r = Ractor.new do a = 1 Thread.new {puts "Thread in ractor: a=#{a}"}.join end r.take # Here "Thread in ractor: a=1" will be printed
关于代码示例的说明¶ ↑
在下面的示例中,我们有时使用以下方法来等待当前未阻塞的 Ractor 完成(或取得进展)。
def wait sleep(0.1) end
这 **仅用于演示目的**,不应在实际代码中使用。大多数情况下,使用 take
来等待 Ractor 完成。
参考¶ ↑
有关更多详细信息,请参阅 Ractor 设计文档。
公共类方法
返回当前正在运行或阻塞(等待)的 Ractor 数量。
Ractor.count #=> 1 r = Ractor.new(name: 'example') { Ractor.yield(1) } Ractor.count #=> 2 (main + example ractor) r.take # wait for Ractor.yield(1) r.take # wait until r will finish Ractor.count #=> 1
# File ruby_3_3_0/ractor.rb, line 302 def self.count __builtin_cexpr! %q{ ULONG2NUM(GET_VM()->ractor.cnt); } end
返回当前正在执行的 Ractor
。
Ractor.current #=> #<Ractor:#1 running>
# File ruby_3_3_0/ractor.rb, line 288 def self.current __builtin_cexpr! %q{ rb_ractor_self(rb_ec_ractor_ptr(ec)); } end
返回主 Ractor
# File ruby_3_3_0/ractor.rb, line 848 def self.main __builtin_cexpr! %q{ rb_ractor_self(GET_VM()->ractor.main_ractor); } end
使用参数和代码块创建一个新的 Ractor。
给定的代码块(Proc
)将被隔离(无法访问任何外部变量)。代码块内部的 self
将引用当前的 Ractor。
r = Ractor.new { puts "Hi, I am #{self.inspect}" } r.take # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"
传递的任何 args
都将通过与通过 send
/Ractor.receive 发送的对象相同的规则传播到代码块参数。如果 args
中的参数不可共享,它将被复制(通过深度克隆,这可能效率低下)。
arg = [1, 2, 3] puts "Passing: #{arg} (##{arg.object_id})" r = Ractor.new(arg) {|received_arg| puts "Received: #{received_arg} (##{received_arg.object_id})" } r.take # Prints: # Passing: [1, 2, 3] (#280) # Received: [1, 2, 3] (#300)
Ractor 的 name
可以为了调试目的而设置。
r = Ractor.new(name: 'my ractor') {}; r.take p r #=> #<Ractor:#3 my ractor test.rb:1 terminated>
# File ruby_3_3_0/ractor.rb, line 273 def self.new(*args, name: nil, &block) b = block # TODO: builtin bug raise ArgumentError, "must be called with a block" unless block if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)") warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \ "Also there are many implementation issues.", uplevel: 0, category: :experimental) end loc = caller_locations(1, 1).first loc = "#{loc.path}:#{loc.lineno}" __builtin_ractor_create(loc, name, args, b) end
从当前 Ractor 的传入端口接收消息(该消息由另一个 Ractor 的 send
发送到该端口)。
r = Ractor.new do v1 = Ractor.receive puts "Received: #{v1}" end r.send('message1') r.take # Here will be printed: "Received: message1"
或者,可以使用私有实例方法 receive
。
r = Ractor.new do v1 = receive puts "Received: #{v1}" end r.send('message1') r.take # This prints: "Received: message1"
如果队列为空,该方法将阻塞。
r = Ractor.new do puts "Before first receive" v1 = Ractor.receive puts "Received: #{v1}" v2 = Ractor.receive puts "Received: #{v2}" end wait puts "Still not received" r.send('message1') wait puts "Still received only one" r.send('message2') r.take
输出
Before first receive Still not received Received: message1 Still received only one Received: message2
如果在 Ractor 上调用了 close_incoming
,并且传入队列中没有更多消息,则该方法将引发 Ractor::ClosedError
Ractor.new do close_incoming receive end wait # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
# File ruby_3_3_0/ractor.rb, line 430 def self.receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end
仅接收特定消息。
与 Ractor.receive
不同,Ractor.receive_if
可以接收代码块中的模式(或任何过滤器),您可以选择接受 Ractor 的传入队列中可用的消息。
r = Ractor.new do p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3" p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1" p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2" end r << "bar1" r << "baz2" r << "foo3" r.take
这将输出
foo3 bar1 baz2
如果代码块返回真值,则该消息将从传入队列中删除并返回。否则,该消息将保留在传入队列中,并且给定代码块将检查下一条消息。
如果传入队列中没有剩余消息,则该方法将阻塞,直到有新消息到达。
如果代码块通过 break/return/exception/throw 退出,则该消息将从传入队列中删除,就像返回了真值一样。
r = Ractor.new do val = Ractor.receive_if{|msg| msg.is_a?(Array)} puts "Received successfully: #{val}" end r.send(1) r.send('test') wait puts "2 non-matching sent, nothing received" r.send([1, 2, 3]) wait
打印
2 non-matching sent, nothing received Received successfully: [1, 2, 3]
请注意,您不能在给定的代码块中递归调用 receive/receive_if。您不应该在代码块中执行除消息过滤之外的任何任务。
Ractor.current << true Ractor.receive_if{|msg| Ractor.receive} #=> `receive': can not call receive/receive_if recursively (Ractor::Error)
# File ruby_3_3_0/ractor.rb, line 509 def self.receive_if &b Primitive.ractor_receive_if b end
等待任何 ractor 在其输出端口中拥有数据,从该 ractor 读取数据,然后返回该 ractor 和接收到的对象。
r1 = Ractor.new {Ractor.yield 'from 1'} r2 = Ractor.new {Ractor.yield 'from 2'} r, obj = Ractor.select(r1, r2) puts "received #{obj.inspect} from #{r.inspect}" # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running> # But could just as well print "from r2" here, either prints could be first.
如果给定的 ractors 之一是当前 ractor,并且它被选中,则 r
将包含 :receive
符号而不是 ractor 对象。
r1 = Ractor.new(Ractor.current) do |main| main.send 'to main' Ractor.yield 'from 1' end r2 = Ractor.new do Ractor.yield 'from 2' end r, obj = Ractor.select(r1, r2, Ractor.current) puts "received #{obj.inspect} from #{r.inspect}" # Could print: received "to main" from :receive
如果提供了 yield_value
,则如果另一个 ractor 正在调用 take
,则可以产生该值。在这种情况下,将返回对 [:yield, nil]
。
r1 = Ractor.new(Ractor.current) do |main| puts "Received from main: #{main.take}" end puts "Trying to select" r, obj = Ractor.select(r1, Ractor.current, yield_value: 123) wait puts "Received #{obj.inspect} from #{r.inspect}"
这将打印
Trying to select Received from main: 123 Received nil from :yield
move
布尔标志定义是否复制(默认)或移动产生的值。
# File ruby_3_3_0/ractor.rb, line 358 def self.select(*ractors, yield_value: yield_unspecified = true, move: false) raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? if ractors.delete Ractor.current do_receive = true else do_receive = false end __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move end
向当前 ractor 的输出端口发送一条消息,以便被 take
接受。
r = Ractor.new {Ractor.yield 'Hello from ractor'} puts r.take # Prints: "Hello from ractor"
此方法是阻塞的,只有在有人消费发送的消息后才会返回。
r = Ractor.new do Ractor.yield 'Hello from ractor' puts "Ractor: after yield" end wait puts "Still not taken" puts r.take
这将打印
Still not taken Hello from ractor Ractor: after yield
如果输出端口已使用 close_outgoing
关闭,则该方法将引发
r = Ractor.new do close_outgoing Ractor.yield 'Hello from ractor' end wait # `yield': The outgoing-port is already closed (Ractor::ClosedError)
move
参数的含义与 send
相同。
# File ruby_3_3_0/ractor.rb, line 643 def self.yield(obj, move: false) __builtin_cexpr! %q{ ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) } end
公共实例方法
从 ractor 本地存储中获取值
# File ruby_3_3_0/ractor.rb, line 838 def [](sym) Primitive.ractor_local_value(sym) end
在 ractor 本地存储中设置值
# File ruby_3_3_0/ractor.rb, line 843 def []=(sym, val) Primitive.ractor_local_value_set(sym, val) end
关闭传入端口并返回它是否已关闭。所有进一步尝试在 ractor 中 Ractor.receive
,以及向 ractor send
将使用 Ractor::ClosedError
失败。
r = Ractor.new {sleep(500)} r.close_incoming #=> false r.close_incoming #=> true r.send('test') # Ractor::ClosedError (The incoming-port is already closed)
# File ruby_3_3_0/ractor.rb, line 749 def close_incoming __builtin_cexpr! %q{ ractor_close_incoming(ec, RACTOR_PTR(self)); } end
关闭传出端口并返回它是否已关闭。所有进一步尝试在 ractor 中 Ractor.yield
,以及从 ractor take
将使用 Ractor::ClosedError
失败。
r = Ractor.new {sleep(500)} r.close_outgoing #=> false r.close_outgoing #=> true r.take # Ractor::ClosedError (The outgoing-port is already closed)
# File ruby_3_3_0/ractor.rb, line 767 def close_outgoing __builtin_cexpr! %q{ ractor_close_outgoing(ec, RACTOR_PTR(self)); } end
# File ruby_3_3_0/ractor.rb, line 716 def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } id = __builtin_cexpr! %q{ UINT2NUM(rb_ractor_id(RACTOR_PTR(self))) } status = __builtin_cexpr! %q{ rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_)) } "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>" end
在 Ractor.new
中设置的名称,或 nil
。
# File ruby_3_3_0/ractor.rb, line 729 def name __builtin_cexpr! %q{RACTOR_PTR(self)->name} end
将消息发送到 Ractor 的传入队列,以便被 Ractor.receive
接收。
r = Ractor.new do value = Ractor.receive puts "Received #{value}" end r.send 'message' # Prints: "Received: message"
该方法是非阻塞的(即使 Ractor 尚未准备好接收任何内容,也会立即返回)。
r = Ractor.new {sleep(5)} r.send('test') puts "Sent successfully" # Prints: "Sent successfully" immediately
尝试发送到已完成执行的 Ractor 会引发 Ractor::ClosedError
。
r = Ractor.new {} r.take p r # "#<Ractor:#6 (irb):23 terminated>" r.send('test') # Ractor::ClosedError (The incoming-port is already closed)
如果在 Ractor 上调用了 close_incoming
,该方法也会引发 Ractor::ClosedError
。
r = Ractor.new do sleep(500) receive end r.close_incoming r.send('test') # Ractor::ClosedError (The incoming-port is already closed) # The error is raised immediately, not when the ractor tries to receive
如果 obj
不可共享,默认情况下它将通过深度克隆复制到接收 Ractor 中。如果传递了 move: true
,则该对象将被移动到接收 Ractor 中,并且发送方将无法访问它。
r = Ractor.new {puts "Received: #{receive}"} msg = 'message' r.send(msg, move: true) r.take p msg
这将打印
Received: message in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>
发送方对该对象及其部分的所有引用将变得无效。
r = Ractor.new {puts "Received: #{receive}"} s = 'message' ary = [s] copy = ary.dup r.send(ary, move: true) s.inspect # Ractor::MovedError (can not send any methods to a moved object) ary.class # Ractor::MovedError (can not send any methods to a moved object) copy.class # => Array, it is different object copy[0].inspect # Ractor::MovedError (can not send any methods to a moved object) # ...but its item was still a reference to `s`, which was moved
如果该对象是可共享的,则 move: true
对它没有影响。
r = Ractor.new {puts "Received: #{receive}"} s = 'message'.freeze r.send(s, move: true) s.inspect #=> "message", still available
# File ruby_3_3_0/ractor.rb, line 599 def send(obj, move: false) __builtin_cexpr! %q{ ractor_send(ec, RACTOR_PTR(self), obj, move) } end
从 Ractor 的传出端口获取消息,该消息由 Ractor.yield
或在 Ractor 终止时放入。
r = Ractor.new do Ractor.yield 'explicit yield' 'last value' end puts r.take #=> 'explicit yield' puts r.take #=> 'last value' puts r.take # Ractor::ClosedError (The outgoing-port is already closed)
最后一个值也被发送到传出端口这一事实意味着 take
可以用作 Thread#join
的类似物(“只需等待 Ractor 完成”)。但是,如果有人已经消费了该消息,它会引发异常。
如果传出端口使用 close_outgoing
关闭,该方法将引发 Ractor::ClosedError
。
r = Ractor.new do sleep(500) Ractor.yield 'Hello from ractor' end r.close_outgoing r.take # Ractor::ClosedError (The outgoing-port is already closed) # The error would be raised immediately, not when ractor will try to receive
如果在 Ractor
中引发了未捕获的异常,它将通过 take 作为 Ractor::RemoteError
传播。
r = Ractor.new {raise "Something weird happened"} begin r.take rescue => e p e # => #<Ractor::RemoteError: thrown by remote Ractor.> p e.ractor == r # => true p e.cause # => #<RuntimeError: Something weird happened> end
Ractor::ClosedError
是 StopIteration
的子类,因此 Ractor 的终止将使任何接收此消息的循环退出,而不会传播错误。
r = Ractor.new do 3.times {|i| Ractor.yield "message #{i}"} "finishing" end loop {puts "Received: " + r.take} puts "Continue successfully"
这将打印
Received: message 0 Received: message 1 Received: message 2 Received: finishing Continue successfully
# File ruby_3_3_0/ractor.rb, line 710 def take __builtin_cexpr! %q{ ractor_take(ec, RACTOR_PTR(self)) } end
私有实例方法
与 Ractor.receive
相同
# File ruby_3_3_0/ractor.rb, line 441 def receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end
与 Ractor.receive_if
相同
# File ruby_3_3_0/ractor.rb, line 514 def receive_if &b Primitive.ractor_receive_if b end