Ractor
- Ruby 的 Actor 式并发抽象¶ ↑
Ractor
旨在为 Ruby 提供一种并行执行功能,而无需担心线程安全问题。
摘要¶ ↑
解释器进程中的多个协程¶ ↑
您可以创建多个协程,它们并行运行。
-
Ractor.new{ expr }
创建一个新的Ractor
,expr
在并行计算机上并行运行。 -
解释器使用第一个
Ractor
(称为主协程)进行调用。 -
如果主
Ractor
终止,所有协程都会收到终止请求,就像线程一样(如果主线程(第一个调用的Thread
),Ruby 解释器会向所有正在运行的线程发送终止执行的请求)。 -
每个
Ractor
都有 1 个或多个线程。 -
一个
Ractor
中的线程共享一个协程范围的全局锁,类似于 GIL(在 MRI 术语中称为 GVL),因此它们不能并行运行(除非在 C 级显式释放 GVL)。不同协程中的线程可以并行运行。
多个协程之间的有限共享¶ ↑
协程不像线程那样共享所有内容。
-
大多数对象是不可共享对象,因此您无需担心由共享引起的线程安全问题。
-
有些对象是可共享对象。
-
不可变对象:冻结的对象,不引用不可共享对象。
-
i = 123
:i
是一个不可变对象。 -
s = "str".freeze
:s
是一个不可变对象。 -
a = [1, [2], 3].freeze
:a
不是一个不可变对象,因为a
引用了不可共享对象[2]
(它没有被冻结)。 -
h = {c: Object}.freeze
:h
是一个不可变对象,因为h
引用了Symbol
:c
和可共享的Object
类对象,该对象没有被冻结。
-
-
类/模块对象
-
特殊的可共享对象
-
Ractor
对象本身。 -
等等…
-
Ractor 之间的两种类型通信¶ ↑
Ractor 通过彼此之间交换消息来进行通信并同步执行。有两种消息交换协议:推送类型(消息传递)和拉取类型。
-
推送类型消息传递:
Ractor#send(obj)
和Ractor.receive()
对。 -
发送方 Ractor 通过
r.send(obj)
将obj
传递给 Ractorr
,接收方 Ractor 使用Ractor.receive
接收消息。 -
发送方知道目标
Ractor
r
,而接收方不知道发送方(接受来自任何 Ractor 的所有消息)。 -
接收方具有无限队列,发送方将消息入队。发送方不会阻塞以将消息放入此队列。
-
许多其他基于 Actor 的语言都采用了这种类型的消息交换。
-
Ractor.receive_if{ filter_expr }
是Ractor.receive
的变体,用于选择消息。 -
拉取类型通信:
Ractor.yield(obj)
和Ractor#take()
对。 -
发送方 Ractor 通过
Ractor.yield(obj)
声明要生成obj
,接收方Ractor
使用r.take
获取它。 -
如果另一方不存在,发送方或接收方将被阻塞。
用于发送消息的复制和移动语义¶ ↑
要将不可共享对象作为消息发送,需要复制或移动对象。
-
复制:使用深层复制。
-
移动:移动成员资格。
-
移动对象后,发送方无法访问已移动的对象。
-
保证至少只有一个
Ractor
可以访问该对象。
线程安全性¶ ↑
Ractor
有助于编写线程安全的并发程序,但我们也可以使用 Ractor 编写线程不安全的程序。
-
优点:共享限制
-
大多数对象都是不可共享的,因此我们无法创建数据竞争和竞争条件程序。
-
可共享对象受解释器或锁定机制的保护。
-
错误:类/模块可能违反此假设
-
为了与旧行为兼容,类和模块可能会引入数据竞争等问题。
-
Ruby 程序员在多
Ractor
程序中修改类/模块对象时应注意。 -
错误:
Ractor
无法解决所有线程安全问题 -
存在一些阻塞操作(等待发送、等待 yield 和等待获取),因此您可以创建具有死锁和活锁问题的程序。
-
某些类型的可共享对象可以引入事务(例如 STM)。但是,错误使用事务会导致状态不一致。
没有 Ractor
,我们需要跟踪所有状态修改以调试线程安全问题。有了 Ractor
,您可以专注于与 Ractor 共享的可疑代码。
创建和终止¶ ↑
Ractor.new
¶ ↑
-
Ractor.new{ expr }
生成另一个Ractor
。
# Ractor.new with a block creates new Ractor r = Ractor.new do # This block will be run in parallel with other ractors end # You can name a Ractor with `name:` argument. r = Ractor.new name: 'test-name' do end # and Ractor#name returns its name. r.name #=> 'test-name'
给定块隔离¶ ↑
该 Ractor
在给定块中执行给定的 expr
。给定块将通过 Proc#isolate
方法与外部范围隔离(尚未向 Ruby 用户公开)。为了防止在 Ractor 之间共享不可共享的对象,块外部变量、self
和其他信息将被隔离。
Proc#isolate
在 Ractor
创建时调用(当调用 Ractor.new
时)。如果给定的 Proc
对象由于外部变量等原因无法隔离,则会引发错误。
begin a = true r = Ractor.new do a #=> ArgumentError because this block accesses `a`. end r.take # see later rescue ArgumentError end
-
给定块的
self
是Ractor
对象本身。
r = Ractor.new do p self.class #=> Ractor self.object_id end r.take == self.object_id #=> false
传递给 Ractor.new()
的参数将成为给定块的块参数。但是,解释器不会传递参数对象引用,而是将它们作为消息发送(有关详细信息,请参见下文)。
r = Ractor.new 'ok' do |msg| msg #=> 'ok' end r.take #=> 'ok'
# almost similar to the last example r = Ractor.new do msg = Ractor.receive msg end r.send 'ok' r.take #=> 'ok'
给定块的执行结果¶ ↑
给定块的返回值将成为传出消息(有关详细信息,请参见下文)。
r = Ractor.new do 'ok' end r.take #=> `ok`
# almost similar to the last example r = Ractor.new do Ractor.yield 'ok' end r.take #=> 'ok'
给定块中的错误将传播到传出消息的接收者。
r = Ractor.new do raise 'ok' # exception will be transferred to the receiver end begin r.take rescue Ractor::RemoteError => e e.cause.class #=> RuntimeError e.cause.message #=> 'ok' e.ractor #=> r end
Ractor 之间的通信¶ ↑
Ractor 之间的通信是通过发送和接收消息来实现的。有两种方法可以相互通信。
-
(1) 消息发送/接收
-
(1-1) 推送类型发送/接收(发送方知道接收方)。类似于 Actor 模型。
-
(1-2) 拉取类型 yield/take(接收方知道发送方)。
-
(2) 使用可共享容器对象
-
Ractor::TVar gem (ko1/ractor-tvar)
-
更多?
用户可以使用 (1) 控制程序执行时间,但不要使用 (2) 控制(仅作为临界区管理)。
对于消息发送和接收,有两种类型的 API:推送类型和拉取类型。
-
(1-1) 发送/接收(推送类型)
-
Ractor#send(obj)
(Ractor#<<(obj)
是别名)将消息发送到 Ractor 的传入端口。传入端口连接到无限大小的传入队列,因此Ractor#send
永远不会阻塞。 -
Ractor.receive
从其自己的传入队列中出队一条消息。如果传入队列为空,Ractor.receive
调用将阻塞。 -
Ractor.receive_if{|msg| filter_expr }
是Ractor.receive
的变体。receive_if
仅接收filter_expr
为真的消息(因此Ractor.receive
等同于Ractor.receive_if{ true }
)。 -
(1-2) yield/take(拉取类型)
-
Ractor.yield(obj)
通过传出端口将消息发送到正在调用Ractor#take
的Ractor
。如果没有 Ractor 正在等待它,Ractor.yield(obj)
将阻塞。如果多个 Ractor 正在等待Ractor.yield(obj)
,则只有一个Ractor
可以接收消息。 -
Ractor#take
接收由Ractor.yield(obj)
方法从指定Ractor
等待的消息。如果Ractor
尚未调用Ractor.yield
,则Ractor#take
调用将阻塞。 -
Ractor.select()
可以等待take
、yield
和receive
的成功。 -
您可以关闭传入端口或传出端口。
-
您可以使用
Ractor#close_incoming
和Ractor#close_outgoing
关闭它们。 -
如果
Ractor
的传入端口关闭,则您无法向Ractor
发送消息。如果Ractor.receive
由于关闭的传入端口而被阻塞,则它将引发异常。 -
如果
Ractor
的传出端口关闭,则您无法在Ractor
上调用Ractor#take
和Ractor.yield
。如果 Ractor 由于Ractor#take
或Ractor.yield
而被阻塞,则关闭传出端口将在这些阻塞的 Ractor 上引发异常。 -
当
Ractor
终止时,Ractor 的端口将关闭。 -
有三种方法可以将对象作为消息发送
-
(1) 发送引用:发送可共享对象,仅发送对对象的引用(快速)
-
(2) 复制对象:通过深度复制不可共享对象来发送不可共享对象(慢)。请注意,您无法发送不支持深度复制的对象。某些
T_DATA
对象不受支持。 -
(3) 移动对象:使用成员资格发送不可共享对象引用。发送方
Ractor
在移动对象后将无法再访问已移动的对象(引发异常)。当前实现为接收方Ractor
创建一个新对象作为已移动对象,并将发送方对象的引用复制到已移动对象。 -
您可以通过
move:
关键字、Ractor#send(obj, move: true/false)
和Ractor.yield(obj, move: true/false)
(默认值为false
(COPY))来选择“复制”和“移动”。
发送/接收端口¶ ↑
每个Ractor
都有输入端口和输出端口。输入端口连接到无限大小的输入队列。
Ractor r +-------------------------------------------+ | incoming outgoing | | port port | r.send(obj) ->*->[incoming queue] Ractor.yield(obj) ->*-> r.take | | | | v | | Ractor.receive | +-------------------------------------------+ Connection example: r2.send obj on r1、Ractor.receive on r2 +----+ +----+ * r1 |---->* r2 * +----+ +----+ Connection example: Ractor.yield(obj) on r1, r1.take on r2 +----+ +----+ * r1 *---->- r2 * +----+ +----+ Connection example: Ractor.yield(obj) on r1 and r2, and waiting for both simultaneously by Ractor.select(r1, r2) +----+ * r1 *------+ +----+ | +----> Ractor.select(r1, r2) +----+ | * r2 *------| +----+
r = Ractor.new do msg = Ractor.receive # Receive from r's incoming queue msg # send back msg as block return value end r.send 'ok' # Send 'ok' to r's incoming port -> incoming queue r.take # Receive from r's outgoing port
最后一个示例显示了以下 ractor 网络。
+------+ +---+ * main |------> * r *---+ +------+ +---+ | ^ | +-------------------+
可以使用Ractor.new
的参数简化此代码。
# Actual argument 'ok' for `Ractor.new()` will be sent to created Ractor. r = Ractor.new 'ok' do |msg| # Values for formal parameters will be received from incoming queue. # Similar to: msg = Ractor.receive msg # Return value of the given block will be sent via outgoing port end # receive from the r's outgoing port. r.take #=> `ok`
Ractor.new
的块返回值¶ ↑
如前所述,Ractor.new
的返回值(Ractor.new{ expr }
中expr
的计算值)可以通过Ractor#take
获取。
Ractor.new{ 42 }.take #=> 42
当块返回值可用时,Ractor
将处于死亡状态,因此除了获取的Ractor
之外,任何 ractors 都无法触及返回值,因此任何值都可以通过此通信路径发送而无需任何修改。
r = Ractor.new do a = "hello" binding end r.take.eval("p a") #=> "hello" (other communication path can not send a Binding object directly)
使用Ractor.select
等待多个 Ractors¶ ↑
您可以使用Ractor.select(*ractors)
等待多个 Ractor 的yield
。Ractor.select()
的返回值为[r, msg]
,其中r
是正在产生yield
的Ractor
,msg
是产生的消息。
等待单个 ractor(与Ractor.take
相同)
r1 = Ractor.new{'r1'} r, obj = Ractor.select(r1) r == r1 and obj == 'r1' #=> true
等待两个 ractors
r1 = Ractor.new{'r1'} r2 = Ractor.new{'r2'} rs = [r1, r2] as = [] # Wait for r1 or r2's Ractor.yield r, obj = Ractor.select(*rs) rs.delete(r) as << obj # Second try (rs only contain not-closed ractors) r, obj = Ractor.select(*rs) rs.delete(r) as << obj as.sort == ['r1', 'r2'] #=> true
Complex
示例
pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end RN = 10 rs = RN.times.map{|i| Ractor.new pipe, i do |pipe, i| msg = pipe.take msg # ping-pong end } RN.times{|i| pipe << i } RN.times.map{ r, n = Ractor.select(*rs) rs.delete r n }.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
多个 Ractors 可以发送到一个Ractor
。
# Create 10 ractors and they send objects to pipe ractor. # pipe ractor yield received objects pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end RN = 10 rs = RN.times.map{|i| Ractor.new pipe, i do |pipe, i| pipe << i end } RN.times.map{ pipe.take }.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
TODO:当前Ractor.select()
与select(2)
存在相同的问题,因此应改进此接口。
TODO:go 语言的select
语法使用循环轮询技术来实现公平调度。现在Ractor.select()
没有使用它。
关闭 Ractor 的端口¶ ↑
-
Ractor#close_incoming/outgoing
关闭输入/输出端口(类似于Queue#close
)。 -
Ractor#close_incoming
-
r.send(obj)
(其中r
的输入端口已关闭)将引发异常。 -
当输入队列为空且输入端口已关闭时,
Ractor.receive
将引发异常。如果输入队列不为空,它将出队一个对象,不会引发异常。 -
Ractor#close_outgoing
-
在已关闭输出端口的
Ractor
上执行Ractor.yield
将引发异常。 -
对于已经关闭输出端口的
Ractor
,Ractor#take
会抛出异常。如果Ractor#take
是阻塞的,它也会抛出异常。 -
当
Ractor
终止时,端口会自动关闭。 -
即使实现终止了底层原生线程,Ractor 块的返回值也会作为
Ractor.yield(ret_val)
产生。
示例(尝试从已关闭的 Ractor
中获取)
r = Ractor.new do 'finish' end r.take # success (will return 'finish') begin o = r.take # try to take from closed Ractor rescue Ractor::ClosedError 'ok' else "ng: #{o}" end
示例(尝试发送到已关闭(已终止)的 Ractor
)
r = Ractor.new do end r.take # wait terminate begin r.send(1) rescue Ractor::ClosedError 'ok' else 'ng' end
当多个 Ractor 正在等待 Ractor.yield()
时,Ractor#close_outgoing
会通过抛出异常(ClosedError
)来取消所有阻塞。
通过复制发送消息¶ ↑
如果 obj
是不可共享对象,Ractor#send(obj)
或 Ractor.yield(obj)
会深度复制 obj
。
obj = 'str'.dup r = Ractor.new obj do |msg| # return received msg's object_id msg.object_id end obj.object_id == r.take #=> false
某些对象不支持复制值,会抛出异常。
obj = Thread.new{} begin Ractor.new obj do |msg| msg end rescue TypeError => e e.message #=> #<TypeError: allocator undefined for Thread> else 'ng' # unreachable here end
通过移动发送消息¶ ↑
Ractor#send(obj, move: true)
或 Ractor.yield(obj, move: true)
会将 obj
移动到目标 Ractor
。如果源 Ractor
接触了已移动的对象(例如,调用 obj.foo()
之类的函数),则会发生错误。
# move with Ractor#send r = Ractor.new do obj = Ractor.receive obj << ' world' end str = 'hello' r.send str, move: true modified = r.take #=> 'hello world' # str is moved, and accessing str from this Ractor is prohibited begin # Error because it touches moved str. str << ' exception' # raise Ractor::MovedError rescue Ractor::MovedError modified #=> 'hello world' else raise 'unreachable' end
# move with Ractor.yield r = Ractor.new do obj = 'hello' Ractor.yield obj, move: true obj << 'world' # raise Ractor::MovedError end str = r.take begin r.take rescue Ractor::RemoteError p str #=> "hello" end
某些对象不支持移动,会抛出异常。
r = Ractor.new do Ractor.receive end r.send(Thread.new{}, move: true) #=> allocator undefined for Thread (TypeError)
为了实现对已移动对象的访问禁止,使用了类替换技术来实现它。
可共享对象¶ ↑
以下对象是可共享的。
-
不可变对象
-
小整数,一些符号,
true
,false
,nil
(在内部也称为SPECIAL_CONST_P()
对象) -
冻结的原生对象
-
Numeric
对象:Float
,Complex
,Rational
,大整数(在内部为T_BIGNUM
) -
所有符号。
-
-
冻结的
String
和Regexp
对象(它们的实例变量应该只引用可共享对象) -
Ractor
和其他关心同步的特殊对象。
实现:现在可共享对象 (RVALUE
) 具有 FL_SHAREABLE
标志。此标志可以延迟添加。
为了创建可共享对象,提供了 Ractor.make_shareable(obj)
方法。在这种情况下,尝试通过冻结 obj
和递归遍历对象来使其可共享。此方法接受 copy:
关键字(默认值为 false)。Ractor.make_shareable(obj, copy: true)
尝试对 obj
进行深拷贝,并使复制后的对象可共享。
语言更改以隔离 Ractor 之间的不可共享对象¶ ↑
为了隔离 Ractor 之间的不可共享对象,我们在多 Ractor Ruby 程序中引入了额外的语言语义。
请注意,在不使用 Ractor 的情况下,不需要这些额外的语义(与 Ruby 2 完全兼容)。
全局变量¶ ↑
只有主 Ractor
(在解释器启动时创建的 Ractor
)可以访问全局变量。
$gv = 1 r = Ractor.new do $gv end begin r.take rescue Ractor::RemoteError => e e.cause.message #=> 'can not access global variables from non-main Ractors' end
请注意,一些特殊的全局变量是 Ractor 本地的,例如 $stdin
、$stdout
、$stderr
。有关更多详细信息,请参见 [Bug #17268]。
可共享对象的实例变量¶ ↑
如果引用值为可共享对象,则可以从非主 Ractor 获取类/模块的实例变量。
class C @iv = 1 end p Ractor.new do class C @iv end end.take #=> 1
否则,只有主 Ractor
可以访问可共享对象的实例变量。
class C @iv = [] # unshareable object end Ractor.new do class C begin p @iv rescue Ractor::IsolationError p $!.message #=> "can not get unshareable values from instance variables of classes/modules from non-main Ractors" end begin @iv = 42 rescue Ractor::IsolationError p $!.message #=> "can not set instance variables of classes/modules by non-main Ractors" end end end.take
shared = Ractor.new{} shared.instance_variable_set(:@iv, 'str') r = Ractor.new shared do |shared| p shared.instance_variable_get(:@iv) end begin r.take rescue Ractor::RemoteError => e e.cause.message #=> can not access instance variables of shareable objects from non-main Ractors (Ractor::IsolationError) end
请注意,类/模块对象的实例变量在 Ractor 上也是禁止的。
Class
变量¶ ↑
只有主 Ractor
可以访问类变量。
class C @@cv = 'str' end r = Ractor.new do class C p @@cv end end begin r.take rescue => e e.class #=> Ractor::IsolationError end
常量¶ ↑
只有主 Ractor
可以读取引用不可共享对象的常量。
class C CONST = 'str' end r = Ractor.new do C::CONST end begin r.take rescue => e e.class #=> Ractor::IsolationError end
只有主 Ractor
可以定义引用不可共享对象的常量。
class C end r = Ractor.new do C::CONST = 'str' end begin r.take rescue => e e.class #=> Ractor::IsolationError end
为了使多 Ractor 支持的库,常量应该只引用可共享对象。
TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}
在这种情况下,TABLE
引用了一个不可共享的 Hash
对象。因此,其他 Ractor 无法引用 TABLE
常量。为了使其可共享,我们可以使用 Ractor.make_shareable()
,如下所示。
TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )
为了简化操作,Ruby 3.0 引入了新的 `shareable_constant_value` 指令。
# shareable_constant_value: literal TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'} #=> Same as: TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )
`shareable_constant_value` 指令接受以下模式(描述使用示例:`CONST = expr`)
-
none:不做任何操作。与 `CONST = expr` 相同。
-
literal
-
如果 `expr` 由字面量组成,则替换为 `CONST = Ractor.make_shareable(expr)`。
-
否则:替换为 `CONST = expr.tap{|o| raise unless Ractor.shareable?(o)}`。
-
experimental_everything:替换为 `CONST = Ractor.make_shareable(expr)`。
-
experimental_copy:替换为 `CONST = Ractor.make_shareable(expr, copy: true)`。
除了 `none` 模式(默认)之外,保证分配的常量仅引用可共享对象。
有关更多详细信息,请参阅 doc/syntax/comments.rdoc。
实现说明¶ ↑
-
每个
Ractor
都有自己的 ID (rb_ractor_t::pub::id
)。 -
在调试模式下,所有不可共享对象都用当前 Ractor 的 ID 进行标记,并在 VM 中检查以检测不可共享对象泄漏(从不同的
Ractor
访问对象)。
示例¶ ↑
Actor 模型中的传统环形示例¶ ↑
RN = 1_000 CR = Ractor.current r = Ractor.new do p Ractor.receive CR << :fin end RN.times{ r = Ractor.new r do |next_r| next_r << Ractor.receive end } p :setup_ok r << 1 p Ractor.receive
Fork-join¶ ↑
def fib n if n < 2 1 else fib(n-2) + fib(n-1) end end RN = 10 rs = (1..RN).map do |i| Ractor.new i do |i| [i, fib(i)] end end until rs.empty? r, v = Ractor.select(*rs) rs.delete r p answer: v end
工作池¶ ↑
require 'prime' pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end N = 1000 RN = 10 workers = (1..RN).map do Ractor.new pipe do |pipe| while n = pipe.take Ractor.yield [n, n.prime?] end end end (1..N).each{|i| pipe << i } pp (1..N).map{ _r, (n, b) = Ractor.select(*workers) [n, b] }.sort_by{|(n, b)| n}
管道¶ ↑
# pipeline with yield/take r1 = Ractor.new do 'r1' end r2 = Ractor.new r1 do |r1| r1.take + 'r2' end r3 = Ractor.new r2 do |r2| r2.take + 'r3' end p r3.take #=> 'r1r2r3'
# pipeline with send/receive r3 = Ractor.new Ractor.current do |cr| cr.send Ractor.receive + 'r3' end r2 = Ractor.new r3 do |r3| r3.send Ractor.receive + 'r2' end r1 = Ractor.new r2 do |r2| r2.send Ractor.receive + 'r1' end r1 << 'r0' p Ractor.receive #=> "r0r1r2r3"
监督¶ ↑
# ring example again r = Ractor.current (1..10).map{|i| r = Ractor.new r, i do |r, i| r.send Ractor.receive + "r#{i}" end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
# ring example with an error r = Ractor.current rs = (1..10).map{|i| r = Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1" r.send "r0" p Ractor.select(*rs, Ractor.current) #=> [:receive, "r0r10r9r8r7r6r5r4r3r2r1"] r.send "e0" p Ractor.select(*rs, Ractor.current) #=> #<Thread:0x000056262de28bd8 run> terminated with exception (report_on_exception is true): Traceback (most recent call last): 2: from /home/ko1/src/ruby/trunk/test.rb7:in `block (2 levels) in <main>' 1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop' /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception Traceback (most recent call last): 2: from /home/ko1/src/ruby/trunk/test.rb7:in `block (2 levels) in <main>' 1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop' /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception 1: from /home/ko1/src/ruby/trunk/test.rb21:in `<main>' <internal:ractor>:69:in `select': thrown by remote Ractor. (Ractor::RemoteError)
# resend non-error message r = Ractor.current rs = (1..10).map{|i| r = Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1" r.send "r0" p Ractor.select(*rs, Ractor.current) [:receive, "r0r10r9r8r7r6r5r4r3r2r1"] msg = 'e0' begin r.send msg p Ractor.select(*rs, Ractor.current) rescue Ractor::RemoteError msg = 'r0' retry end #=> <internal:ractor>:100:in `send': The incoming-port is already closed (Ractor::ClosedError) # because r == r[-1] is terminated.
# ring example with supervisor and re-start def make_ractor r, i Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end end r = Ractor.current rs = (1..10).map{|i| r = make_ractor(r, i) } msg = 'e0' # error causing message begin r.send msg p Ractor.select(*rs, Ractor.current) rescue Ractor::RemoteError r = rs[-1] = make_ractor(rs[-2], rs.size-1) msg = 'x0' retry end #=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]