class Ractor
Ractor 是 Ruby 的 Actor 模型抽象,提供线程安全的并行执行。
Ractor.new
创建一个新的 Ractor,它可以并行运行。
# The simplest ractor r = Ractor.new {puts "I am in Ractor!"} r.take # wait for it to finish # Here, "I am in Ractor!" is printed
Ractor 之间不共享所有对象。这有两个主要好处:在 Ractor 之间,不可能出现诸如数据竞争和竞态条件之类的线程安全问题。另一个好处是并行性。
为了实现这一点,跨 Ractor 的对象共享是有限的。例如,与线程不同,Ractor 不能访问其他 Ractor 中所有可用的对象。即使通常通过外部作用域中的变量可用的对象也被禁止在 Ractor 之间使用。
a = 1 r = Ractor.new {puts "I am in Ractor! a=#{a}"} # fails immediately with # ArgumentError (can not isolate a Proc because it accesses outer variables (a).)
对象必须显式共享
a = 1 r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}
在 CRuby(默认实现)上,全局虚拟机锁 (GVL) 是按 Ractor 持有的,因此 Ractor 可以并行执行而不会相互锁定。这与 CRuby 上的线程情况不同。
应该通过发送和接收消息来在 Ractor 之间传递对象,而不是访问共享状态。
a = 1 r = Ractor.new do a_in_ractor = receive # receive blocks until somebody passes a message puts "I am in Ractor! a=#{a_in_ractor}" end r.send(a) # pass it r.take # Here, "I am in Ractor! a=1" is printed
有两对用于发送/接收消息的方法
-
Ractor#send
和Ractor.receive
,用于当发送者知道接收者时(推送); -
Ractor.yield
和Ractor#take
,用于当接收者知道发送者时(拉取);
除此之外,传递给 Ractor.new
的任何参数都会传递给代码块,并且可以在那里像通过 Ractor.receive
接收一样使用,并且最后一个代码块的值会像通过 Ractor.yield
发送一样发送到 Ractor 外部。
一个经典的乒乓球示例
server = Ractor.new(name: "server") do puts "Server starts: #{self.inspect}" puts "Server sends: ping" Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested received = Ractor.receive # The server doesn't know the sender and receives from whoever sent puts "Server received: #{received}" end client = Ractor.new(server) do |srv| # The server is sent to the client, and available as srv puts "Client starts: #{self.inspect}" received = srv.take # The client takes a message from the server puts "Client received from " \ "#{srv.inspect}: #{received}" puts "Client sends to " \ "#{srv.inspect}: pong" srv.send 'pong' # The client sends a message to the server end [client, server].each(&:take) # Wait until they both finish
这将输出类似以下内容
Server starts: #<Ractor:#2 server test.rb:1 running> Server sends: ping Client starts: #<Ractor:#3 test.rb:8 running> Client received from #<Ractor:#2 server test.rb:1 blocking>: ping Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong Server received: pong
Ractor 通过传入端口接收消息,并通过传出端口发送消息。可以使用 Ractor#close_incoming
和 Ractor#close_outgoing
分别禁用其中一个端口。当 Ractor 终止时,其端口会自动关闭。
可共享和不可共享的对象¶ ↑
当一个对象在 Ractor 之间发送和接收时,了解该对象是否可共享非常重要。大多数 Ruby 对象都是不可共享的对象。即使是冻结的对象,如果它们包含(通过其实例变量)未冻结的对象,也可能是不可共享的。
可共享对象是可以被多个线程使用而不会损害线程安全的对象,例如数字、true
和 false
。Ractor.shareable?
允许你检查这一点,Ractor.make_shareable
尝试使对象可共享(如果它还不是),如果无法做到,则会报错。
Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true Ractor.shareable?('foo'.freeze) #=> true Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen ary = ['hello', 'world'] ary.frozen? #=> false ary[0].frozen? #=> false Ractor.make_shareable(ary) ary.frozen? #=> true ary[0].frozen? #=> true ary[1].frozen? #=> true
当发送可共享对象(通过 send
或 Ractor.yield
)时,不会对其进行额外的处理。它只是变得可以被两个 Ractor 使用。当发送不可共享对象时,它可以被复制或移动。第一个是默认的,它通过深度克隆(Object#clone
)其结构中不可共享的部分来完全复制对象。
data = ['foo', 'bar'.freeze] r = Ractor.new do data2 = Ractor.receive puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}" end r.send(data) r.take puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"
这将输出类似以下内容
In ractor: 340, 360, 320 Outside : 380, 400, 320
请注意,数组和数组内部的未冻结字符串的对象 ID 在 Ractor 中已更改,因为它们是不同的对象。第二个数组的元素(一个可共享的冻结字符串)是同一个对象。
深度克隆对象可能很慢,有时甚至不可能。或者,可以在发送期间使用 move: true
。这将把不可共享的对象移动到接收 Ractor,使其无法被发送 Ractor 访问。
data = ['foo', 'bar'] r = Ractor.new do data_in_ractor = Ractor.receive puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}" end r.send(data, move: true) r.take puts "Outside: moved? #{Ractor::MovedObject === data}" puts "Outside: #{data.inspect}"
这将输出
In ractor: 100, 120 Outside: moved? true test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)
请注意,即使是 inspect
(以及更基本的方法,如 __id__
)在移动的对象上也是无法访问的。
Class
和 Module
对象是可共享的,因此类/模块定义在 Ractor 之间共享。Ractor 对象也是可共享的。所有对可共享对象的操作都是线程安全的,因此将保持线程安全属性。我们不能在 Ruby 中定义可变的可共享对象,但是 C 扩展可以引入它们。
如果变量的值不可共享,则禁止访问(获取)其他 Ractor 中可共享对象的实例变量。这可能会发生,因为模块/类是可共享的,但它们可以具有值不可共享的实例变量。在非主 Ractor 中,也禁止在类/模块上设置实例变量(即使值是可共享的)。
class C class << self attr_accessor :tricky end end C.tricky = "unshareable".dup r = Ractor.new(C) do |cls| puts "I see #{cls}" puts "I can't see #{cls.tricky}" cls.tricky = true # doesn't get here, but this would also raise an error end r.take # I see C # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
如果常量是可共享的,则 Ractor 可以访问它们。主 Ractor 是唯一可以访问不可共享常量的 Ractor。
GOOD = 'good'.freeze BAD = 'bad'.dup r = Ractor.new do puts "GOOD=#{GOOD}" puts "BAD=#{BAD}" end r.take # GOOD=good # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError) # Consider the same C class from above r = Ractor.new do puts "I see #{C}" puts "I can't see #{C.tricky}" end r.take # I see C # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
另请参阅 注释语法说明中的 # shareable_constant_value
编译指示的描述。
Ractor 与线程¶ ↑
每个 Ractor 都有自己的主 Thread
。可以从 Ractor 内部创建新线程(在 CRuby 上,它们与此 Ractor 的其他线程共享 GVL)。
r = Ractor.new do a = 1 Thread.new {puts "Thread in ractor: a=#{a}"}.join end r.take # Here "Thread in ractor: a=1" will be printed
关于代码示例的说明¶ ↑
在下面的示例中,有时我们会使用以下方法来等待当前未阻塞的 Ractor 完成(或取得进展)。
def wait sleep(0.1) end
它**仅用于演示目的**,不应在实际代码中使用。大多数时候,take
用于等待 Ractor 完成。
参考¶ ↑
有关更多详细信息,请参见 Ractor 设计文档。
公共类方法
从当前 Ractor
的 Ractor 本地存储中获取一个值
# File ruby_3_4_1/ractor.rb, line 851 def self.[](sym) Primitive.ractor_local_value(sym) end
在当前 Ractor
的 Ractor 本地存储中设置一个值
# File ruby_3_4_1/ractor.rb, line 856 def self.[]=(sym, val) Primitive.ractor_local_value_set(sym, val) end
返回当前正在运行或阻塞(等待)的 Ractor 的数量。
Ractor.count #=> 1 r = Ractor.new(name: 'example') { Ractor.yield(1) } Ractor.count #=> 2 (main + example ractor) r.take # wait for Ractor.yield(1) r.take # wait until r will finish Ractor.count #=> 1
# File ruby_3_4_1/ractor.rb, line 302 def self.count __builtin_cexpr! %q{ ULONG2NUM(GET_VM()->ractor.cnt); } end
返回当前正在执行的 Ractor
。
Ractor.current #=> #<Ractor:#1 running>
# File ruby_3_4_1/ractor.rb, line 288 def self.current __builtin_cexpr! %q{ rb_ractor_self(rb_ec_ractor_ptr(ec)); } end
返回主 Ractor
# File ruby_3_4_1/ractor.rb, line 879 def self.main __builtin_cexpr! %q{ rb_ractor_self(GET_VM()->ractor.main_ractor); } end
如果当前 Ractor 是主 Ractor,则返回 true
# File ruby_3_4_1/ractor.rb, line 886 def self.main? __builtin_cexpr! %q{ RBOOL(GET_VM()->ractor.main_ractor == rb_ec_ractor_ptr(ec)) } end
使用 args 和一个代码块创建一个新的 Ractor。
给定的代码块 (Proc
) 将被隔离(无法访问任何外部变量)。代码块内的 self
将引用当前 Ractor。
r = Ractor.new { puts "Hi, I am #{self.inspect}" } r.take # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"
传递的任何 args
都会按照通过 send
/Ractor.receive 发送对象的相同规则传播到代码块参数。如果 args
中的参数不可共享,它将被复制(通过深度克隆,这可能效率低下)。
arg = [1, 2, 3] puts "Passing: #{arg} (##{arg.object_id})" r = Ractor.new(arg) {|received_arg| puts "Received: #{received_arg} (##{received_arg.object_id})" } r.take # Prints: # Passing: [1, 2, 3] (#280) # Received: [1, 2, 3] (#300)
可以出于调试目的设置 Ractor 的 name
r = Ractor.new(name: 'my ractor') {}; r.take p r #=> #<Ractor:#3 my ractor test.rb:1 terminated>
# File ruby_3_4_1/ractor.rb, line 273 def self.new(*args, name: nil, &block) b = block # TODO: builtin bug raise ArgumentError, "must be called with a block" unless block if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)") warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \ "Also there are many implementation issues.", uplevel: 0, category: :experimental) end loc = caller_locations(1, 1).first loc = "#{loc.path}:#{loc.lineno}" __builtin_ractor_create(loc, name, args, b) end
从当前 Ractor 的传入端口接收一条消息(该消息由另一个 Ractor 的 send
发送到那里)。
r = Ractor.new do v1 = Ractor.receive puts "Received: #{v1}" end r.send('message1') r.take # Here will be printed: "Received: message1"
或者,可以使用私有实例方法 receive
r = Ractor.new do v1 = receive puts "Received: #{v1}" end r.send('message1') r.take # This prints: "Received: message1"
如果队列为空,则该方法将阻塞。
r = Ractor.new do puts "Before first receive" v1 = Ractor.receive puts "Received: #{v1}" v2 = Ractor.receive puts "Received: #{v2}" end wait puts "Still not received" r.send('message1') wait puts "Still received only one" r.send('message2') r.take
输出
Before first receive Still not received Received: message1 Still received only one Received: message2
如果在 Ractor 上调用了 close_incoming
,则如果传入队列中没有更多消息,该方法将引发 Ractor::ClosedError
Ractor.new do close_incoming receive end wait # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
# File ruby_3_4_1/ractor.rb, line 430 def self.receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end
仅接收特定的消息。
可以使用 Ractor.receive_if
在代码块中给定一个模式(或任何过滤器),而不是 Ractor.receive
,并且可以选择接受 Ractor 的传入队列中可用的消息。
r = Ractor.new do p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3" p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1" p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2" end r << "bar1" r << "baz2" r << "foo3" r.take
这将输出
foo3 bar1 baz2
如果代码块返回真值,则该消息将从传入队列中删除并返回。否则,该消息将保留在传入队列中,并且给定的代码块将检查下一条消息。
如果传入队列中没有剩余的消息,该方法将阻塞,直到有新消息到达。
如果代码块通过 break/return/exception/throw 退出,则该消息将从传入队列中删除,就像返回了真值一样。
r = Ractor.new do val = Ractor.receive_if{|msg| msg.is_a?(Array)} puts "Received successfully: #{val}" end r.send(1) r.send('test') wait puts "2 non-matching sent, nothing received" r.send([1, 2, 3]) wait
打印
2 non-matching sent, nothing received Received successfully: [1, 2, 3]
请注意,您不能在给定的代码块中递归调用 receive/receive_if。您不应在代码块中执行除消息过滤之外的任何任务。
Ractor.current << true Ractor.receive_if{|msg| Ractor.receive} #=> `receive': can not call receive/receive_if recursively (Ractor::Error)
# File ruby_3_4_1/ractor.rb, line 509 def self.receive_if &b Primitive.ractor_receive_if b end
等待任何 Ractor 的传出端口中有内容,从该 Ractor 读取,然后返回该 Ractor 和接收到的对象。
r1 = Ractor.new {Ractor.yield 'from 1'} r2 = Ractor.new {Ractor.yield 'from 2'} r, obj = Ractor.select(r1, r2) puts "received #{obj.inspect} from #{r.inspect}" # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running> # But could just as well print "from r2" here, either prints could be first.
如果给定的 Ractor 之一是当前 Ractor,并且被选中,则 r
将包含 :receive
符号而不是 Ractor 对象。
r1 = Ractor.new(Ractor.current) do |main| main.send 'to main' Ractor.yield 'from 1' end r2 = Ractor.new do Ractor.yield 'from 2' end r, obj = Ractor.select(r1, r2, Ractor.current) puts "received #{obj.inspect} from #{r.inspect}" # Could print: received "to main" from :receive
如果提供了 yield_value
,则当另一个 Ractor 调用 take
时,可以产生该值。在这种情况下,将返回对 [:yield, nil]
r1 = Ractor.new(Ractor.current) do |main| puts "Received from main: #{main.take}" end puts "Trying to select" r, obj = Ractor.select(r1, Ractor.current, yield_value: 123) wait puts "Received #{obj.inspect} from #{r.inspect}"
这将打印
Trying to select Received from main: 123 Received nil from :yield
move
布尔标志定义产生的值将被复制(默认)还是移动。
# File ruby_3_4_1/ractor.rb, line 358 def self.select(*ractors, yield_value: yield_unspecified = true, move: false) raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? if ractors.delete Ractor.current do_receive = true else do_receive = false end __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move end
如果未设置对应的值,则使用 init_block 生成一个值,并以线程安全的方式存储该值。此方法返回相应的存储值。
(1..10).map{ Thread.new(it){|i| Ractor.store_if_absent(:s){ f(); i } #=> return stored value of key :s } }.map(&:value).uniq.size #=> 1 and f() is called only once
# File ruby_3_4_1/ractor.rb, line 874 def self.store_if_absent(sym) Primitive.ractor_local_value_store_if_absent(sym) end
向当前 Ractor 的传出端口发送消息,以便被 take
接收。
r = Ractor.new {Ractor.yield 'Hello from ractor'} puts r.take # Prints: "Hello from ractor"
此方法是阻塞的,只有在有人消费了发送的消息后才会返回。
r = Ractor.new do Ractor.yield 'Hello from ractor' puts "Ractor: after yield" end wait puts "Still not taken" puts r.take
这将打印
Still not taken Hello from ractor Ractor: after yield
如果传出端口已使用 close_outgoing
关闭,则该方法将引发异常。
r = Ractor.new do close_outgoing Ractor.yield 'Hello from ractor' end wait # `yield': The outgoing-port is already closed (Ractor::ClosedError)
move
参数的含义与 send
相同。
# File ruby_3_4_1/ractor.rb, line 643 def self.yield(obj, move: false) __builtin_cexpr! %q{ ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) } end
公共实例方法
在当前 Ractor
的 Ractor 本地存储中设置一个值。已过时,请使用 Ractor.[]=
代替。
# File ruby_3_4_1/ractor.rb, line 846 def []=(sym, val) Primitive.ractor_local_value_set(sym, val) end
关闭传入端口,并返回它是否已被关闭。所有后续在 Ractor 中调用 Ractor.receive
和向该 Ractor 发送 send
的尝试都会失败,并抛出 Ractor::ClosedError
异常。
r = Ractor.new {sleep(500)} r.close_incoming #=> false r.close_incoming #=> true r.send('test') # Ractor::ClosedError (The incoming-port is already closed)
# File ruby_3_4_1/ractor.rb, line 750 def close_incoming __builtin_cexpr! %q{ ractor_close_incoming(ec, RACTOR_PTR(self)); } end
关闭传出端口,并返回它是否已被关闭。所有后续在 Ractor 中调用 Ractor.yield
和从该 Ractor 接收 take
的尝试都会失败,并抛出 Ractor::ClosedError
异常。
r = Ractor.new {sleep(500)} r.close_outgoing #=> false r.close_outgoing #=> true r.take # Ractor::ClosedError (The outgoing-port is already closed)
# File ruby_3_4_1/ractor.rb, line 768 def close_outgoing __builtin_cexpr! %q{ ractor_close_outgoing(ec, RACTOR_PTR(self)); } end
# File ruby_3_4_1/ractor.rb, line 716 def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } id = __builtin_cexpr! %q{ UINT2NUM(rb_ractor_id(RACTOR_PTR(self))) } status = __builtin_cexpr! %q{ rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_)) } "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>" end
在 Ractor.new
中设置的名称,或 nil
。
# File ruby_3_4_1/ractor.rb, line 729 def name __builtin_cexpr! %q{RACTOR_PTR(self)->name} end
向 Ractor 的传入队列发送消息,以便被 Ractor.receive
接收。
r = Ractor.new do value = Ractor.receive puts "Received #{value}" end r.send 'message' # Prints: "Received: message"
该方法是非阻塞的(即使 Ractor 没有准备好接收任何内容也会立即返回)
r = Ractor.new {sleep(5)} r.send('test') puts "Sent successfully" # Prints: "Sent successfully" immediately
尝试向已完成执行的 Ractor 发送消息会引发 Ractor::ClosedError
异常。
r = Ractor.new {} r.take p r # "#<Ractor:#6 (irb):23 terminated>" r.send('test') # Ractor::ClosedError (The incoming-port is already closed)
如果在 Ractor 上调用了 close_incoming
,该方法也会引发 Ractor::ClosedError
异常。
r = Ractor.new do sleep(500) receive end r.close_incoming r.send('test') # Ractor::ClosedError (The incoming-port is already closed) # The error is raised immediately, not when the ractor tries to receive
如果 obj
是不可共享的,默认情况下,它将通过深度克隆复制到接收 Ractor 中。如果传递了 move: true
,则该对象将 *移动* 到接收 Ractor 中,并且发送者无法再访问它。
r = Ractor.new {puts "Received: #{receive}"} msg = 'message' r.send(msg, move: true) r.take p msg
这将打印
Received: message in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>
对该对象及其部分的引用将对发送者失效。
r = Ractor.new {puts "Received: #{receive}"} s = 'message' ary = [s] copy = ary.dup r.send(ary, move: true) s.inspect # Ractor::MovedError (can not send any methods to a moved object) ary.class # Ractor::MovedError (can not send any methods to a moved object) copy.class # => Array, it is different object copy[0].inspect # Ractor::MovedError (can not send any methods to a moved object) # ...but its item was still a reference to `s`, which was moved
如果该对象是可共享的,则 move: true
对它没有影响。
r = Ractor.new {puts "Received: #{receive}"} s = 'message'.freeze r.send(s, move: true) s.inspect #=> "message", still available
# File ruby_3_4_1/ractor.rb, line 599 def send(obj, move: false) __builtin_cexpr! %q{ ractor_send(ec, RACTOR_PTR(self), obj, move) } end
从 Ractor 的传出端口获取消息,该消息由 Ractor.yield
或在 Ractor 终止时放置在那里。
r = Ractor.new do Ractor.yield 'explicit yield' 'last value' end puts r.take #=> 'explicit yield' puts r.take #=> 'last value' puts r.take # Ractor::ClosedError (The outgoing-port is already closed)
最后的值也被发送到传出端口的事实意味着 take
可以用作 Thread#join
的类似物(“只需等待直到 Ractor 完成”)。但是,如果有人已经消费了该消息,则会引发异常。
如果传出端口已使用 close_outgoing
关闭,则该方法将引发 Ractor::ClosedError
异常。
r = Ractor.new do sleep(500) Ractor.yield 'Hello from ractor' end r.close_outgoing r.take # Ractor::ClosedError (The outgoing-port is already closed) # The error would be raised immediately, not when ractor will try to receive
如果在 Ractor
中引发了未捕获的异常,它将通过 take 作为 Ractor::RemoteError
传播。
r = Ractor.new {raise "Something weird happened"} begin r.take rescue => e p e # => #<Ractor::RemoteError: thrown by remote Ractor.> p e.ractor == r # => true p e.cause # => #<RuntimeError: Something weird happened> end
Ractor::ClosedError
是 StopIteration
的后代,因此 Ractor 的终止将打破任何接收此消息的循环,而不会传播错误。
r = Ractor.new do 3.times {|i| Ractor.yield "message #{i}"} "finishing" end loop {puts "Received: " + r.take} puts "Continue successfully"
这将打印
Received: message 0 Received: message 1 Received: message 2 Received: finishing Continue successfully
# File ruby_3_4_1/ractor.rb, line 710 def take __builtin_cexpr! %q{ ractor_take(ec, RACTOR_PTR(self)) } end
私有实例方法
与 Ractor.receive
相同
# File ruby_3_4_1/ractor.rb, line 441 def receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end
与 Ractor.receive_if
相同
# File ruby_3_4_1/ractor.rb, line 514 def receive_if &b Primitive.ractor_receive_if b end