Ractor
- Ruby 的类 Actor 并发抽象¶ ↑
Ractor
旨在提供 Ruby 的并行执行特性,而无需担心线程安全问题。
概述¶ ↑
解释器进程中的多个 Ractor¶ ↑
您可以创建多个 Ractor,它们可以并行运行。
-
Ractor.new{ expr }
创建一个新的Ractor
,并且expr
在并行计算机上并行运行。 -
解释器使用第一个
Ractor
(称为主 Ractor)调用。 -
如果主
Ractor
终止,则所有 Ractor 都会收到像线程一样的终止请求(如果主线程(第一个调用的Thread
)终止,Ruby 解释器会发送所有正在运行的线程以终止执行)。 -
每个
Ractor
都有 1 个或多个线程。 -
一个
Ractor
中的线程共享一个 Ractor 范围的全局锁,如 GIL(在 MRI 术语中为 GVL),因此它们不能并行运行(除非在 C 级显式释放 GVL)。不同 Ractor 中的线程并行运行。
多个 Ractor 之间的有限共享¶ ↑
与线程不同,Ractor 不共享所有内容。
-
大多数对象是不可共享对象,因此您无需担心由共享引起的线程安全问题。
-
某些对象是可共享对象。
-
不可变对象:不引用不可共享对象的冻结对象。
-
i = 123
:i
是一个不可变对象。 -
s = "str".freeze
:s
是一个不可变对象。 -
a = [1, [2], 3].freeze
:a
不是一个不可变对象,因为a
引用不可共享对象[2]
(未冻结)。 -
h = {c: Object}.freeze
:h
是一个不可变对象,因为h
引用Symbol
:c
和可共享的Object
类对象,该对象未冻结。
-
-
类/模块对象
-
特殊的可共享对象
-
Ractor
对象本身。 -
还有更多...
-
Ractor 之间的两种通信方式¶ ↑
Ractor 通过 Ractor 之间的消息交换进行相互通信并同步执行。有两种消息交换协议:推送类型(消息传递)和拉取类型。
-
推送类型消息传递:
Ractor#send(obj)
和Ractor.receive()
对。 -
发送者 Ractor 通过
r.send(obj)
将obj
传递给 Ractorr
,接收者 Ractor 使用Ractor.receive
接收消息。 -
发送者知道目标
Ractor
r
,而接收者不知道发送者(接受来自任何 Ractor 的所有消息)。 -
接收者具有无限队列,并且发送者将消息排入队列。发送者不会阻塞以将消息放入此队列。
-
许多其他基于 Actor 的语言都采用这种类型的消息交换。
-
Ractor.receive_if{ filter_expr }
是Ractor.receive
的变体,用于选择消息。 -
拉取类型通信:
Ractor.yield(obj)
和Ractor#take()
对。 -
发送者 Ractor 通过
Ractor.yield(obj)
声明生成obj
,接收者Ractor
通过r.take
获取它。 -
如果另一方不存在,发送者或接收者将阻塞。
发送消息的复制和移动语义¶ ↑
要将不可共享对象作为消息发送,需要复制或移动对象。
-
复制:使用深层复制。
-
移动:移动所有权。
-
发送者在移动对象后无法访问移动的对象。
-
保证至少只有一个
Ractor
可以访问该对象。
线程安全¶ ↑
Ractor
有助于编写线程安全的并发程序,但我们也可以使用 Ractor 编写线程不安全的程序。
-
优点:共享限制
-
大多数对象是不可共享的,因此我们无法编写数据竞争和竞争条件的程序。
-
可共享对象受到解释器或锁定机制的保护。
-
缺点:类/模块可能会违反此假设
-
为了与旧的行为兼容,类和模块可能会引入数据竞争等问题。
-
如果 Ruby 程序员在多
Ractor
程序中修改类/模块对象,则应注意。 -
缺点:
Ractor
不能解决所有线程安全问题 -
有几个阻塞操作(等待发送、等待 yield 和等待 take),因此您可以编写一个具有死锁和活锁问题的程序。
-
某些类型的可共享对象可以引入事务(例如 STM)。但是,滥用事务会产生不一致的状态。
如果没有 Ractor
,我们需要跟踪所有状态变化以调试线程安全问题。使用 Ractor
,您可以专注于与 Ractor 共享的可疑代码。
创建和终止¶ ↑
Ractor.new
¶ ↑
-
Ractor.new{ expr }
生成另一个Ractor
。
# Ractor.new with a block creates new Ractor r = Ractor.new do # This block will be run in parallel with other ractors end # You can name a Ractor with `name:` argument. r = Ractor.new name: 'test-name' do end # and Ractor#name returns its name. r.name #=> 'test-name'
给定代码块隔离¶ ↑
Ractor
在给定的代码块中执行给定的 expr
。给定代码块将通过 Proc#isolate
方法与外部范围隔离(尚未向 Ruby 用户公开)。为了防止在 Ractor 之间共享不可共享的对象,代码块的外部变量、self
和其他信息将被隔离。
Proc#isolate
在 Ractor
创建时(当调用 Ractor.new
时)调用。如果由于外部变量等原因而无法隔离给定的 Proc
对象,则会引发错误。
begin a = true r = Ractor.new do a #=> ArgumentError because this block accesses `a`. end r.take # see later rescue ArgumentError end
-
给定代码块的
self
是Ractor
对象本身。
r = Ractor.new do p self.class #=> Ractor self.object_id end r.take == self.object_id #=> false
传递给 Ractor.new()
的参数将成为给定代码块的代码块参数。但是,解释器不会传递参数对象引用,而是将它们作为消息发送(有关详细信息,请参见下文)。
r = Ractor.new 'ok' do |msg| msg #=> 'ok' end r.take #=> 'ok'
# almost similar to the last example r = Ractor.new do msg = Ractor.receive msg end r.send 'ok' r.take #=> 'ok'
给定代码块的执行结果¶ ↑
给定代码块的返回值将成为传出消息(有关详细信息,请参见下文)。
r = Ractor.new do 'ok' end r.take #=> `ok`
# almost similar to the last example r = Ractor.new do Ractor.yield 'ok' end r.take #=> 'ok'
给定代码块中的错误将传播到传出消息的接收者。
r = Ractor.new do raise 'ok' # exception will be transferred to the receiver end begin r.take rescue Ractor::RemoteError => e e.cause.class #=> RuntimeError e.cause.message #=> 'ok' e.ractor #=> r end
Ractor 之间的通信¶ ↑
Ractor 之间的通信是通过发送和接收消息来实现的。有两种相互通信的方式。
-
(1) 消息发送/接收
-
(1-1) 推送类型发送/接收(发送者知道接收者)。类似于 Actor 模型。
-
(1-2) 拉取类型 yield/take(接收者知道发送者)。
-
(2) 使用可共享的容器对象
-
Ractor::TVar gem (ko1/ractor-tvar)
-
更多?
用户可以使用 (1) 控制程序执行时序,但不应该使用 (2) 控制(仅作为临界区管理)。
对于消息发送和接收,有两种类型的 API:推送类型和拉取类型。
-
(1-1) 发送/接收(推送类型)
-
Ractor#send(obj)
(Ractor#<<(obj)
是别名) 向 Ractor 的传入端口发送消息。传入端口连接到无限大小的传入队列,因此Ractor#send
永远不会阻塞。 -
Ractor.receive
从其自身的传入队列中移除一条消息。如果传入队列为空,则Ractor.receive
调用将会阻塞。 -
Ractor.receive_if{|msg| filter_expr }
是Ractor.receive
的变体。receive_if
仅接收filter_expr
为 true 的消息(因此Ractor.receive
等同于Ractor.receive_if{ true }
)。 -
(1-2) yield/take(拉取类型)
-
Ractor.yield(obj)
向通过传出端口调用Ractor#take
的Ractor
发送消息。如果没有 Ractor 等待接收,则Ractor.yield(obj)
将会阻塞。如果有多个 Ractor 等待Ractor.yield(obj)
,则只有一个Ractor
可以接收消息。 -
Ractor#take
接收来自指定的Ractor
,并通过Ractor.yield(obj)
方法等待的消息。如果Ractor
尚未调用Ractor.yield
,则Ractor#take
调用将会阻塞。 -
Ractor.select()
可以等待take
、yield
和receive
的成功。 -
你可以关闭传入端口或传出端口。
-
你可以使用
Ractor#close_incoming
和Ractor#close_outgoing
关闭它们。 -
如果
Ractor
的传入端口已关闭,则你不能send
消息给该Ractor
。如果Ractor.receive
因传入端口关闭而阻塞,则会引发异常。 -
如果
Ractor
的传出端口已关闭,则你不能在该Ractor
上调用Ractor#take
和Ractor.yield
。如果 Ractor 因Ractor#take
或Ractor.yield
而阻塞,则关闭传出端口将会在这些阻塞的 Ractor 上引发异常。 -
当
Ractor
终止时,该 Ractor 的端口将被关闭。 -
有 3 种方式可以发送对象作为消息
-
(1) 发送引用:发送可共享对象时,仅发送对该对象的引用(快速)。
-
(2) 复制对象:通过深度复制来发送不可共享的对象(慢)。请注意,你不能发送不支持深度复制的对象。一些
T_DATA
对象(其类在 C 扩展中定义的对象,例如StringIO
)不受支持。 -
(3) 移动对象:发送具有所有权的不可共享对象引用。发送者
Ractor
在移动对象后,不能再访问移动的对象(会引发异常)。当前实现会为接收者Ractor
创建新对象作为移动对象,并将发送对象的引用复制到移动对象。不支持T_DATA
对象。 -
你可以使用
move:
关键字选择“复制”和“移动”,Ractor#send(obj, move: true/false)
和Ractor.yield(obj, move: true/false)
(默认值为false
(复制))。
发送/接收端口¶ ↑
每个 Ractor
都有传入端口和传出端口。传入端口连接到无限大小的传入队列。
Ractor r +-------------------------------------------+ | incoming outgoing | | port port | r.send(obj) ->*->[incoming queue] Ractor.yield(obj) ->*-> r.take | | | | v | | Ractor.receive | +-------------------------------------------+ Connection example: r2.send obj on r1、Ractor.receive on r2 +----+ +----+ * r1 |---->* r2 * +----+ +----+ Connection example: Ractor.yield(obj) on r1, r1.take on r2 +----+ +----+ * r1 *---->- r2 * +----+ +----+ Connection example: Ractor.yield(obj) on r1 and r2, and waiting for both simultaneously by Ractor.select(r1, r2) +----+ * r1 *------+ +----+ | +----> Ractor.select(r1, r2) +----+ | * r2 *------| +----+
r = Ractor.new do msg = Ractor.receive # Receive from r's incoming queue msg # send back msg as block return value end r.send 'ok' # Send 'ok' to r's incoming port -> incoming queue r.take # Receive from r's outgoing port
最后一个示例显示了以下 Ractor 网络。
+------+ +---+ * main |------> * r *---+ +------+ +---+ | ^ | +-------------------+
并且可以通过为 Ractor.new
使用参数来简化此代码。
# Actual argument 'ok' for `Ractor.new()` will be sent to created Ractor. r = Ractor.new 'ok' do |msg| # Values for formal parameters will be received from incoming queue. # Similar to: msg = Ractor.receive msg # Return value of the given block will be sent via outgoing port end # receive from the r's outgoing port. r.take #=> `ok`
Ractor.new
的块的返回值¶ ↑
正如已经解释的那样,可以通过 Ractor#take
获取 Ractor.new
的返回值(Ractor.new{ expr }
中 expr
的计算值)。
Ractor.new{ 42 }.take #=> 42
当块返回值可用时,Ractor
已经死亡,因此除了接收的 Ractor
之外,没有其他 Ractor 可以触及该返回值,因此可以通过此通信路径发送任何值而无需任何修改。
r = Ractor.new do a = "hello" binding end r.take.eval("p a") #=> "hello" (other communication path can not send a Binding object directly)
使用 Ractor.select
等待多个 Ractor¶ ↑
你可以使用 Ractor.select(*ractors)
等待多个 Ractor 的 yield
。Ractor.select()
的返回值是 [r, msg]
,其中 r
是产生值的 Ractor
,msg
是产生的值。
等待单个 Ractor(与 Ractor.take
相同)
r1 = Ractor.new{'r1'} r, obj = Ractor.select(r1) r == r1 and obj == 'r1' #=> true
等待两个 Ractor
r1 = Ractor.new{'r1'} r2 = Ractor.new{'r2'} rs = [r1, r2] as = [] # Wait for r1 or r2's Ractor.yield r, obj = Ractor.select(*rs) rs.delete(r) as << obj # Second try (rs only contain not-closed ractors) r, obj = Ractor.select(*rs) rs.delete(r) as << obj as.sort == ['r1', 'r2'] #=> true
Complex
示例
pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end RN = 10 rs = RN.times.map{|i| Ractor.new pipe, i do |pipe, i| msg = pipe.take msg # ping-pong end } RN.times{|i| pipe << i } RN.times.map{ r, n = Ractor.select(*rs) rs.delete r n }.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
多个 Ractor 可以向一个 Ractor
发送消息。
# Create 10 ractors and they send objects to pipe ractor. # pipe ractor yield received objects pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end RN = 10 rs = RN.times.map{|i| Ractor.new pipe, i do |pipe, i| pipe << i end } RN.times.map{ pipe.take }.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
待办事项:当前的 Ractor.select()
存在与 select(2)
相同的问题,因此应该改进此接口。
待办事项:Go 语言的 select
语法使用轮询技术来实现公平调度。现在 Ractor.select()
不使用它。
关闭 Ractor 的端口¶ ↑
-
Ractor#close_incoming/outgoing
关闭传入/传出端口(类似于Queue#close
)。 -
Ractor#close_incoming
-
如果
r
的传入端口已关闭,则r.send(obj)
将会引发异常。 -
当传入队列为空且传入端口关闭时,
Ractor.receive
会引发异常。如果传入队列不为空,它会在不引发异常的情况下移除一个对象。 -
Ractor#close_outgoing
-
在关闭传出端口的
Ractor
上调用Ractor.yield
将会引发异常。 -
对关闭传出端口的
Ractor
调用Ractor#take
将会引发异常。如果Ractor#take
正在阻塞,则会引发异常。 -
当
Ractor
终止时,端口会自动关闭。 -
即使实现终止了基于本地线程的线程,Ractor 的块的返回值也会作为
Ractor.yield(ret_val)
产生。
示例(尝试从关闭的 Ractor
获取)
r = Ractor.new do 'finish' end r.take # success (will return 'finish') begin o = r.take # try to take from closed Ractor rescue Ractor::ClosedError 'ok' else "ng: #{o}" end
示例(尝试向关闭(已终止)的 Ractor
发送消息)
r = Ractor.new do end r.take # wait terminate begin r.send(1) rescue Ractor::ClosedError 'ok' else 'ng' end
当多个 Ractor 正在等待 Ractor.yield()
时,Ractor#close_outgoing
将通过引发异常 (ClosedError
) 来取消所有阻塞。
通过复制发送消息¶ ↑
如果 obj
是不可共享的对象,则 Ractor#send(obj)
或 Ractor.yield(obj)
会深度复制 obj
。
obj = 'str'.dup r = Ractor.new obj do |msg| # return received msg's object_id msg.object_id end obj.object_id == r.take #=> false
一些对象不支持复制其值,并且会引发异常。
obj = Thread.new{} begin Ractor.new obj do |msg| msg end rescue TypeError => e e.message #=> #<TypeError: allocator undefined for Thread> else 'ng' # unreachable here end
通过移动发送消息¶ ↑
Ractor#send(obj, move: true)
或 Ractor.yield(obj, move: true)
将 obj
移动到目标 Ractor
。如果源 Ractor
访问了移动的对象(例如,调用 obj.foo()
之类的方法),则会发生错误。
# move with Ractor#send r = Ractor.new do obj = Ractor.receive obj << ' world' end str = 'hello' r.send str, move: true modified = r.take #=> 'hello world' # str is moved, and accessing str from this Ractor is prohibited begin # Error because it touches moved str. str << ' exception' # raise Ractor::MovedError rescue Ractor::MovedError modified #=> 'hello world' else raise 'unreachable' end
# move with Ractor.yield r = Ractor.new do obj = 'hello' Ractor.yield obj, move: true obj << 'world' # raise Ractor::MovedError end str = r.take begin r.take rescue Ractor::RemoteError p str #=> "hello" end
一些对象不支持移动,并且会引发异常。
r = Ractor.new do Ractor.receive end r.send(Thread.new{}, move: true) #=> allocator undefined for Thread (TypeError)
为了实现对移动对象的访问禁止,使用了类替换技术来实现。
可共享对象¶ ↑
以下对象是可共享的。
-
不可变对象
-
小整数、一些符号、
true
、false
、nil
(又名内部的SPECIAL_CONST_P()
对象) -
冻结的本地对象
-
Numeric
对象:Float
、Complex
、Rational
、大整数(内部的T_BIGNUM
) -
所有符号。
-
-
冻结的
String
和Regexp
对象(它们的实例变量应仅引用可共享对象) -
Ractor
和其他关注同步的特殊对象。
实现:现在可共享对象 (RVALUE
) 具有 FL_SHAREABLE
标志。该标志可以延迟添加。
为了创建可共享对象,提供了 Ractor.make_shareable(obj)
方法。在这种情况下,它会尝试通过冻结 obj
和递归遍历的对象使其成为可共享对象。此方法接受 copy:
关键字(默认值为 false)。Ractor.make_shareable(obj, copy: true)
尝试深度复制 obj
并使复制的对象成为可共享对象。
在 Ractor 之间隔离不可共享对象的语言更改¶ ↑
为了在 Ractor 之间隔离不可共享的对象,我们在多 Ractor Ruby 程序中引入了额外的语言语义。
请注意,在不使用 Ractor 的情况下,不需要这些额外的语义(与 Ruby 2 100% 兼容)。
全局变量¶ ↑
只有主 Ractor
(在解释器启动时创建的 Ractor
)才能访问全局变量。
$gv = 1 r = Ractor.new do $gv end begin r.take rescue Ractor::RemoteError => e e.cause.message #=> 'can not access global variables from non-main Ractors' end
请注意,一些特殊的全局变量是 Ractor 本地的,例如 $stdin
、$stdout
、$stderr
。有关更多详细信息,请参阅 [Bug #17268]。
可共享对象的实例变量¶ ↑
如果引用的值是可共享对象,则可以从非主 Ractor 获取类/模块的实例变量。
class C @iv = 1 end p Ractor.new do class C @iv end end.take #=> 1
否则,只有主 Ractor
才能访问可共享对象的实例变量。
class C @iv = [] # unshareable object end Ractor.new do class C begin p @iv rescue Ractor::IsolationError p $!.message #=> "can not get unshareable values from instance variables of classes/modules from non-main Ractors" end begin @iv = 42 rescue Ractor::IsolationError p $!.message #=> "can not set instance variables of classes/modules by non-main Ractors" end end end.take
shared = Ractor.new{} shared.instance_variable_set(:@iv, 'str') r = Ractor.new shared do |shared| p shared.instance_variable_get(:@iv) end begin r.take rescue Ractor::RemoteError => e e.cause.message #=> can not access instance variables of shareable objects from non-main Ractors (Ractor::IsolationError) end
请注意,Ractor 上也禁止类/模块对象的实例变量。
Class
变量¶ ↑
只有主 Ractor
才能访问类变量。
class C @@cv = 'str' end r = Ractor.new do class C p @@cv end end begin r.take rescue => e e.class #=> Ractor::IsolationError end
常量¶ ↑
只有主 Ractor
才能读取引用不可共享对象的常量。
class C CONST = 'str' end r = Ractor.new do C::CONST end begin r.take rescue => e e.class #=> Ractor::IsolationError end
只有主 Ractor
才能定义引用不可共享对象的常量。
class C end r = Ractor.new do C::CONST = 'str' end begin r.take rescue => e e.class #=> Ractor::IsolationError end
为了创建支持多 Ractor 的库,常量应该仅引用可共享对象。
TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}
在这种情况下,TABLE
引用一个不可共享的 Hash
对象。因此,其他 Ractor 不能引用 TABLE
常量。为了使其可共享,我们可以像这样使用 Ractor.make_shareable()
。
TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )
为了简化操作,Ruby 3.0 引入了新的 shareable_constant_value
指令。
# shareable_constant_value: literal TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'} #=> Same as: TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )
shareable_constant_value
指令接受以下模式(描述使用示例:CONST = expr
)
-
none:不执行任何操作。与以下操作相同:
CONST = expr
-
literal
-
如果
expr
由字面量组成,则替换为CONST = Ractor.make_shareable(expr)
。 -
否则:替换为
CONST = expr.tap{|o| raise unless Ractor.shareable?(o)}
。 -
experimental_everything:替换为
CONST = Ractor.make_shareable(expr)
。 -
experimental_copy:替换为
CONST = Ractor.make_shareable(expr, copy: true)
。
除了 none
模式(默认值)之外,保证了分配的常量仅引用可共享对象。
有关更多详细信息,请参阅 doc/syntax/comments.rdoc。
实现说明¶ ↑
-
每个
Ractor
都有自己的 ID (rb_ractor_t::pub::id
)。 -
在调试模式下,所有不可共享的对象都用当前 Ractor 的 id 标记,并在 VM 中进行检查以检测不可共享对象的泄漏(从不同的
Ractor
访问对象)。
示例¶ ↑
Actor 模型中的传统环形示例¶ ↑
RN = 1_000 CR = Ractor.current r = Ractor.new do p Ractor.receive CR << :fin end RN.times{ r = Ractor.new r do |next_r| next_r << Ractor.receive end } p :setup_ok r << 1 p Ractor.receive
Fork-join¶ ↑
def fib n if n < 2 1 else fib(n-2) + fib(n-1) end end RN = 10 rs = (1..RN).map do |i| Ractor.new i do |i| [i, fib(i)] end end until rs.empty? r, v = Ractor.select(*rs) rs.delete r p answer: v end
工作池¶ ↑
require 'prime' pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end N = 1000 RN = 10 workers = (1..RN).map do Ractor.new pipe do |pipe| while n = pipe.take Ractor.yield [n, n.prime?] end end end (1..N).each{|i| pipe << i } pp (1..N).map{ _r, (n, b) = Ractor.select(*workers) [n, b] }.sort_by{|(n, b)| n}
管道¶ ↑
# pipeline with yield/take r1 = Ractor.new do 'r1' end r2 = Ractor.new r1 do |r1| r1.take + 'r2' end r3 = Ractor.new r2 do |r2| r2.take + 'r3' end p r3.take #=> 'r1r2r3'
# pipeline with send/receive r3 = Ractor.new Ractor.current do |cr| cr.send Ractor.receive + 'r3' end r2 = Ractor.new r3 do |r3| r3.send Ractor.receive + 'r2' end r1 = Ractor.new r2 do |r2| r2.send Ractor.receive + 'r1' end r1 << 'r0' p Ractor.receive #=> "r0r1r2r3"
监控¶ ↑
# ring example again r = Ractor.current (1..10).map{|i| r = Ractor.new r, i do |r, i| r.send Ractor.receive + "r#{i}" end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
# ring example with an error r = Ractor.current rs = (1..10).map{|i| r = Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1" r.send "r0" p Ractor.select(*rs, Ractor.current) #=> [:receive, "r0r10r9r8r7r6r5r4r3r2r1"] r.send "e0" p Ractor.select(*rs, Ractor.current) #=> # <Thread:0x000056262de28bd8 run> terminated with exception (report_on_exception is true): # Traceback (most recent call last): # 2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>' # 1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop' # /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception # Traceback (most recent call last): # 2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>' # 1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop' # /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception # 1: from /home/ko1/src/ruby/trunk/test.rb:21:in `<main>' # <internal:ractor>:69:in `select': thrown by remote Ractor. (Ractor::RemoteError)
# resend non-error message r = Ractor.current rs = (1..10).map{|i| r = Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end } r.send "r0" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1" r.send "r0" p Ractor.select(*rs, Ractor.current) [:receive, "r0r10r9r8r7r6r5r4r3r2r1"] msg = 'e0' begin r.send msg p Ractor.select(*rs, Ractor.current) rescue Ractor::RemoteError msg = 'r0' retry end #=> <internal:ractor>:100:in `send': The incoming-port is already closed (Ractor::ClosedError) # because r == r[-1] is terminated.
# ring example with supervisor and re-start def make_ractor r, i Ractor.new r, i do |r, i| loop do msg = Ractor.receive raise if /e/ =~ msg r.send msg + "r#{i}" end end end r = Ractor.current rs = (1..10).map{|i| r = make_ractor(r, i) } msg = 'e0' # error causing message begin r.send msg p Ractor.select(*rs, Ractor.current) rescue Ractor::RemoteError r = rs[-1] = make_ractor(rs[-2], rs.size-1) msg = 'x0' retry end #=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]