class Thread
线程是 Ruby 并发编程模型的实现。
需要多个执行线程的程序是 Ruby Thread
类的完美候选者。
例如,我们可以使用 ::new
创建一个与主线程执行分离的新线程。
thr = Thread.new { puts "What's the big deal" }
然后我们可以暂停主线程的执行,并使用 join
让新线程完成。
thr.join #=> "What's the big deal"
如果我们不在主线程终止之前调用 thr.join
,则包括 thr
在内的所有其他线程都将被终止。
或者,你可以使用数组一次处理多个线程,如下例所示
threads = [] threads << Thread.new { puts "What's the big deal" } threads << Thread.new { 3.times { puts "Threads are fun!" } }
创建几个线程后,我们等待它们全部依次完成。
threads.each { |thr| thr.join }
要检索线程的最后一个值,请使用 value
thr = Thread.new { sleep 1; "Useful value" } thr.value #=> "Useful value"
Thread
初始化¶ ↑
为了创建新线程,Ruby 提供了 ::new
、::start
和 ::fork
。每个方法都必须提供一个代码块,否则将引发 ThreadError
。
当子类化 Thread
类时,子类的 initialize
方法将被 ::start
和 ::fork
忽略。否则,请确保在 initialize
方法中调用 super。
Thread
终止¶ ↑
为了终止线程,Ruby 提供了多种方法。
类方法 ::kill
用于退出给定的线程
thr = Thread.new { sleep } Thread.kill(thr) # sends exit() to thr
或者,你可以使用实例方法 exit
或其任何别名 kill
或 terminate
。
thr.exit
Thread
状态¶ ↑
Ruby 提供了一些实例方法来查询给定线程的状态。要获取包含当前线程状态的字符串,请使用 status
thr = Thread.new { sleep } thr.status # => "sleep" thr.exit thr.status # => false
你还可以使用 alive?
来判断线程是正在运行还是休眠,以及使用 stop?
来判断线程是已死还是休眠。
Thread
变量和作用域¶ ↑
由于线程是使用代码块创建的,因此相同的规则适用于其他 Ruby 代码块的变量作用域。在此代码块中创建的任何局部变量都只能由此线程访问。
纤程局部变量 vs. 线程局部变量¶ ↑
每个纤程都有自己的 Thread#[]
存储桶。当你设置新的纤程局部变量时,它只能在此 Fiber
中访问。为了说明这一点
Thread.new { Thread.current[:foo] = "bar" Fiber.new { p Thread.current[:foo] # => nil }.resume }.join
此示例使用 []
获取和 []=
设置纤程局部变量,你还可以使用 keys
列出给定线程的纤程局部变量,并使用 key?
检查纤程局部变量是否存在。
当涉及到线程局部变量时,它们可以在线程的整个作用域内访问。考虑到以下示例
Thread.new{ Thread.current.thread_variable_set(:foo, 1) p Thread.current.thread_variable_get(:foo) # => 1 Fiber.new{ Thread.current.thread_variable_set(:foo, 2) p Thread.current.thread_variable_get(:foo) # => 2 }.resume p Thread.current.thread_variable_get(:foo) # => 2 }.join
你可以看到线程局部变量 :foo
被带入纤程,并在线程结束时被更改为 2
。
此示例使用 thread_variable_set
创建新的线程局部变量,并使用 thread_variable_get
引用它们。
还有 thread_variables
列出所有线程局部变量,以及 thread_variable?
检查给定的线程局部变量是否存在。
Exception
处理¶ ↑
当在线程内部引发未处理的异常时,它将终止。默认情况下,此异常不会传播到其他线程。该异常被存储,当另一个线程调用 value
或 join
时,该异常将在该线程中重新引发。
t = Thread.new{ raise 'something went wrong' } t.value #=> RuntimeError: something went wrong
可以使用 Thread#raise
实例方法从线程外部引发异常,该方法采用与 Kernel#raise
相同的参数。
设置 Thread.abort_on_exception
= true、Thread#abort_on_exception
= true 或 $DEBUG = true 将导致在线程中引发的后续未处理异常在主线程中自动重新引发。
随着类方法 ::handle_interrupt
的添加,你现在可以使用线程异步处理异常。
调度¶ ↑
Ruby 提供了一些方法来支持在你的程序中调度线程。
第一种方法是使用类方法 ::stop
,使当前正在运行的线程进入休眠状态并调度另一个线程的执行。
一旦线程进入休眠状态,你可以使用实例方法 wakeup
将你的线程标记为符合调度条件。
你也可以尝试 ::pass
,它尝试将执行传递给另一个线程,但这取决于操作系统是否会切换正在运行的线程。 priority
也是如此,它允许你提示线程调度程序在传递执行时你希望哪个线程优先。此方法也依赖于操作系统,并且在某些平台上可能会被忽略。
公共类方法
返回全局“异常时中止”条件的状态。
默认值为 false
。
当设置为 true
时,如果任何线程因异常而中止,则引发的异常将在主线程中重新引发。
也可以由全局 $DEBUG 标志或命令行选项 -d
指定。
另请参见 ::abort_on_exception=
。
还有一个实例级别的方法可以为特定线程设置此值,请参阅 abort_on_exception
。
static VALUE rb_thread_s_abort_exc(VALUE _) { return RBOOL(GET_THREAD()->vm->thread_abort_on_exception); }
当设置为 true
时,如果任何线程因异常而中止,则引发的异常将在主线程中重新引发。返回新状态。
Thread.abort_on_exception = true t1 = Thread.new do puts "In new thread" raise "Exception from thread" end sleep(1) puts "not reached"
这将产生
In new thread prog.rb:4: Exception from thread (RuntimeError) from prog.rb:2:in `initialize' from prog.rb:2:in `new' from prog.rb:2
另请参见 ::abort_on_exception
。
还有一个实例级别的方法可以为特定线程设置此值,请参阅 abort_on_exception=
。
static VALUE rb_thread_s_abort_exc_set(VALUE self, VALUE val) { GET_THREAD()->vm->thread_abort_on_exception = RTEST(val); return val; }
返回当前正在执行的线程。
Thread.current #=> #<Thread:0x401bdf4c run>
static VALUE thread_s_current(VALUE klass) { return rb_thread_current(); }
将当前执行堆栈的每一帧作为回溯位置对象生成。
static VALUE each_caller_location(int argc, VALUE *argv, VALUE _) { rb_execution_context_t *ec = GET_EC(); long n, lev = ec_backtrace_range(ec, argc, argv, 1, 1, &n); if (lev >= 0 && n != 0) { rb_ec_partial_backtrace_object(ec, lev, n, NULL, FALSE, TRUE); } return Qnil; }
基本上与 ::new
相同。但是,如果 Thread
类被子类化,则在该子类中调用 start
将不会调用子类的 initialize
方法。
static VALUE thread_start(VALUE klass, VALUE args) { struct thread_create_params params = { .type = thread_invoke_type_proc, .args = args, .proc = rb_block_proc(), }; return thread_create_core(rb_thread_alloc(klass), ¶ms); }
更改异步中断的时序。
中断表示异步事件和 Thread#raise
、Thread#kill
、信号捕获(尚不支持)和主线程终止(如果主线程终止,则所有其他线程都将被终止)的相应过程。
给定的 hash
具有类似于 ExceptionClass => :TimingSymbol
的对。其中 ExceptionClass 是由给定代码块处理的中断。TimingSymbol 可以是以下符号之一
:immediate
-
立即调用中断。
:on_blocking
-
在 BlockingOperation 时调用中断。
:never
-
从不调用所有中断。
BlockingOperation 表示该操作将阻塞调用线程,例如读取和写入。在 CRuby 实现中,BlockingOperation 是在没有 GVL 的情况下执行的任何操作。
被屏蔽的异步中断会延迟到启用为止。此方法类似于 sigprocmask(3)。
注意¶ ↑
异步中断很难使用。
如果需要在线程之间通信,请考虑使用其他方式,例如 Queue
。
或者在深入了解此方法的基础上使用它们。
用法¶ ↑
在此示例中,我们可以防止 Thread#raise
异常。
使用 :never
TimingSymbol,在主线程的第一个代码块中,RuntimeError
异常将始终被忽略。在第二个 ::handle_interrupt
代码块中,我们可以有目的地处理 RuntimeError
异常。
th = Thread.new do Thread.handle_interrupt(RuntimeError => :never) { begin # You can write resource allocation code safely. Thread.handle_interrupt(RuntimeError => :immediate) { # ... } ensure # You can write resource deallocation code safely. end } end Thread.pass # ... th.raise "stop"
在我们忽略 RuntimeError
异常时,可以安全地编写资源分配代码。然后,ensure 代码块是我们安全地释放资源的地方。
堆栈控制设置¶ ↑
可以堆叠多个级别的 ::handle_interrupt
代码块,以便一次控制多个 ExceptionClass 和 TimingSymbol。
Thread.handle_interrupt(FooError => :never) { Thread.handle_interrupt(BarError => :never) { # FooError and BarError are prohibited. } }
ExceptionClass 的继承¶ ↑
所有从 ExceptionClass 参数继承的异常都将被考虑在内。
Thread.handle_interrupt(Exception => :never) { # all exceptions inherited from Exception are prohibited. }
为了处理所有中断,请使用 Object
而不是 Exception
作为 ExceptionClass,因为 kill/terminate 中断不由 Exception
处理。
static VALUE rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) { VALUE mask = Qundef; rb_execution_context_t * volatile ec = GET_EC(); rb_thread_t * volatile th = rb_ec_thread_ptr(ec); volatile VALUE r = Qnil; enum ruby_tag_type state; if (!rb_block_given_p()) { rb_raise(rb_eArgError, "block is needed."); } mask_arg = rb_to_hash_type(mask_arg); if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) { mask = Qnil; } rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask); if (UNDEF_P(mask)) { return rb_yield(Qnil); } if (!RTEST(mask)) { mask = mask_arg; } else if (RB_TYPE_P(mask, T_HASH)) { OBJ_FREEZE(mask); } rb_ary_push(th->pending_interrupt_mask_stack, mask); if (!rb_threadptr_pending_interrupt_empty_p(th)) { th->pending_interrupt_queue_checked = 0; RUBY_VM_SET_INTERRUPT(th->ec); } EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { r = rb_yield(Qnil); } EC_POP_TAG(); rb_ary_pop(th->pending_interrupt_mask_stack); if (!rb_threadptr_pending_interrupt_empty_p(th)) { th->pending_interrupt_queue_checked = 0; RUBY_VM_SET_INTERRUPT(th->ec); } RUBY_VM_CHECK_INTS(th->ec); if (state) { EC_JUMP_TAG(th->ec, state); } return r; }
返回全局“忽略死锁”条件的状态。默认值为 false
,因此不会忽略死锁条件。
另请参见 ::ignore_deadlock=
。
static VALUE rb_thread_s_ignore_deadlock(VALUE _) { return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock); }
返回新状态。设置为 true
时,VM 将不检查死锁条件。仅当您的应用程序可以通过其他方式(例如信号)来打破死锁条件时,设置此选项才有用。
Thread.ignore_deadlock = true queue = Thread::Queue.new trap(:SIGUSR1){queue.push "Received signal"} # raises fatal error unless ignoring deadlock puts queue.pop
另请参见 ::ignore_deadlock
。
static VALUE rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val) { GET_THREAD()->vm->thread_ignore_deadlock = RTEST(val); return val; }
导致给定的 thread
退出,另请参见 Thread::exit
。
count = 0 a = Thread.new { loop { count += 1 } } sleep(0.1) #=> 0 Thread.kill(a) #=> #<Thread:0x401b3d30 dead> count #=> 93947 a.alive? #=> false
static VALUE rb_thread_s_kill(VALUE obj, VALUE th) { return rb_thread_kill(th); }
返回一个 Thread
对象数组,该数组包含所有可运行或已停止的线程。
Thread.new { sleep(200) } Thread.new { 1000000.times {|i| i*i } } Thread.new { Thread.stop } Thread.list.each {|t| p t}
这将产生
#<Thread:0x401b3e84 sleep> #<Thread:0x401b3f38 run> #<Thread:0x401b3fb0 sleep> #<Thread:0x401bdf4c run>
static VALUE thread_list(VALUE _) { return rb_thread_list(); }
返回主线程。
static VALUE rb_thread_s_main(VALUE klass) { return rb_thread_main(); }
创建一个新线程来执行给定的块。
传递给 ::new
的任何 args
都将传递给该块。
arr = [] a, b, c = 1, 2, 3 Thread.new(a,b,c) { |d,e,f| arr << d << e << f }.join arr #=> [1, 2, 3]
如果调用 ::new
时没有块,则会引发 ThreadError
异常。
如果要继承 Thread
,请务必在 initialize
方法中调用 super,否则会引发 ThreadError
。
static VALUE thread_s_new(int argc, VALUE *argv, VALUE klass) { rb_thread_t *th; VALUE thread = rb_thread_alloc(klass); if (GET_RACTOR()->threads.main->status == THREAD_KILLED) { rb_raise(rb_eThreadError, "can't alloc thread"); } rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS); th = rb_thread_ptr(thread); if (!threadptr_initialized(th)) { rb_raise(rb_eThreadError, "uninitialized thread - check '%"PRIsVALUE"#initialize'", klass); } return thread; }
给线程调度程序一个提示,将执行传递给另一个线程。正在运行的线程可能会也可能不会切换,这取决于操作系统和处理器。
static VALUE thread_s_pass(VALUE klass) { rb_thread_schedule(); return Qnil; }
返回异步队列是否为空。
由于可以使用 Thread::handle_interrupt
来延迟异步事件,因此可以使用此方法来确定是否有任何延迟的事件。
如果发现此方法返回 true,则可以完成 :never
代码块。
例如,以下方法立即处理延迟的异步事件。
def Thread.kick_interrupt_immediately Thread.handle_interrupt(Object => :immediate) { Thread.pass } end
如果给出了 error
,则仅检查 error
类型的延迟事件。
用法¶ ↑
th = Thread.new{ Thread.handle_interrupt(RuntimeError => :on_blocking){ while true ... # reach safe point to invoke interrupt if Thread.pending_interrupt? Thread.handle_interrupt(Object => :immediate){} end ... end } } ... th.raise # stop thread
此示例也可以编写为以下形式,您应该使用它来避免异步中断。
flag = true th = Thread.new{ Thread.handle_interrupt(RuntimeError => :on_blocking){ while true ... # reach safe point to invoke interrupt break if flag == false ... end } } ... flag = false # stop thread
static VALUE rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self) { return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self); }
返回全局“报告异常”条件的状态。
自 Ruby 2.5 起,默认值为 true
。
如果此标志为 true,则所有创建的线程将在异常导致线程终止时在 $stderr 上报告一条消息。
Thread.new { 1.times { raise } }
将在 $stderr 上产生以下输出
#<Thread:...> terminated with exception (report_on_exception is true): Traceback (most recent call last): 2: from -e:1:in `block in <main>' 1: from -e:1:in `times'
这样做是为了尽早捕获线程中的错误。在某些情况下,您可能不希望出现此额外输出。有多种方法可以避免额外的输出
-
如果异常不是预期的,最好的方法是修复异常的原因,使其不再发生。
-
如果异常是预期的,则最好在引发异常的位置附近捕获它,而不是让它终止
Thread
。 -
如果保证
Thread
将通过Thread#join
或Thread#value
连接,则在启动Thread
时,可以使用Thread.current.report_on_exception = false
安全地禁用此报告。但是,这可能会在很久之后才处理异常,或者如果由于父线程被阻塞等原因而永远不连接Thread
,则可能根本不处理异常。
另请参见 ::report_on_exception=
。
还有一个实例级别的方法可以为特定线程设置此选项,请参见 report_on_exception=
。
static VALUE rb_thread_s_report_exc(VALUE _) { return RBOOL(GET_THREAD()->vm->thread_report_on_exception); }
返回新状态。设置为 true
时,之后创建的所有线程都将继承该条件,并且如果异常导致线程终止,则会在 $stderr 上报告一条消息。
Thread.report_on_exception = true t1 = Thread.new do puts "In new thread" raise "Exception from thread" end sleep(1) puts "In the main thread"
这将产生
In new thread #<Thread:...prog.rb:2> terminated with exception (report_on_exception is true): Traceback (most recent call last): prog.rb:4:in `block in <main>': Exception from thread (RuntimeError) In the main thread
另请参见 ::report_on_exception
。
还有一个实例级别的方法可以为特定线程设置此选项,请参见 report_on_exception=
。
static VALUE rb_thread_s_report_exc_set(VALUE self, VALUE val) { GET_THREAD()->vm->thread_report_on_exception = RTEST(val); return val; }
基本上与 ::new
相同。但是,如果 Thread
类被子类化,则在该子类中调用 start
将不会调用子类的 initialize
方法。
static VALUE thread_start(VALUE klass, VALUE args) { struct thread_create_params params = { .type = thread_invoke_type_proc, .args = args, .proc = rb_block_proc(), }; return thread_create_core(rb_thread_alloc(klass), ¶ms); }
停止当前线程的执行,使其进入“睡眠”状态,并安排执行另一个线程。
a = Thread.new { print "a"; Thread.stop; print "c" } sleep 0.1 while a.status!='sleep' print "b" a.run a.join #=> "abc"
static VALUE thread_stop(VALUE _) { return rb_thread_stop(); }
公共实例方法
属性引用—使用符号或字符串名称返回纤程局部变量的值(如果未显式在 Fiber
中,则为当前线程的根纤程)。如果指定的变量不存在,则返回 nil
。
[ Thread.new { Thread.current["name"] = "A" }, Thread.new { Thread.current[:name] = "B" }, Thread.new { Thread.current["name"] = "C" } ].each do |th| th.join puts "#{th.inspect}: #{th[:name]}" end
这将产生
#<Thread:0x00000002a54220 dead>: A #<Thread:0x00000002a541a8 dead>: B #<Thread:0x00000002a54130 dead>: C
Thread#[]
和 Thread#[]=
不是线程局部的,而是纤程局部的。这种混淆在 Ruby 1.8 中不存在,因为纤程仅在 Ruby 1.9 中可用。Ruby 1.9 选择让这些方法的行为是纤程局部的,以保存以下用于动态作用域的习惯用法。
def meth(newvalue) begin oldvalue = Thread.current[:name] Thread.current[:name] = newvalue yield ensure Thread.current[:name] = oldvalue end end
如果这些方法是线程局部的,并且给定的块切换了纤程,则该习惯用法可能无法作为动态作用域工作。
f = Fiber.new { meth(1) { Fiber.yield } } meth(2) { f.resume } f.resume p Thread.current[:name] #=> nil if fiber-local #=> 2 if thread-local (The value 2 is leaked to outside of meth method.)
对于线程局部变量,请参见 thread_variable_get
和 thread_variable_set
。
static VALUE rb_thread_aref(VALUE thread, VALUE key) { ID id = rb_check_id(&key); if (!id) return Qnil; return rb_thread_local_aref(thread, id); }
属性赋值—使用符号或字符串设置或创建纤程局部变量的值。
另请参见 Thread#[]
。
对于线程局部变量,请参见 thread_variable_set
和 thread_variable_get
。
static VALUE rb_thread_aset(VALUE self, VALUE id, VALUE val) { return rb_thread_local_aset(self, rb_to_id(id), val); }
返回此 thr
的线程局部“异常时中止”条件的状态。
默认值为 false
。
另请参见 abort_on_exception=
。
还有一个类级别的方法可以为所有线程设置此选项,请参见 ::abort_on_exception
。
static VALUE rb_thread_abort_exc(VALUE thread) { return RBOOL(rb_thread_ptr(thread)->abort_on_exception); }
设置为 true
时,如果此 thr
因异常而中止,则引发的异常将在主线程中重新引发。
另请参见 abort_on_exception
。
还有一个类级别的方法可以为所有线程设置此选项,请参见 ::abort_on_exception=
。
static VALUE rb_thread_abort_exc_set(VALUE thread, VALUE val) { rb_thread_ptr(thread)->abort_on_exception = RTEST(val); return val; }
添加 proc 作为跟踪的处理程序。
请参见 Thread#set_trace_func
和 Kernel#set_trace_func
。
static VALUE thread_add_trace_func_m(VALUE obj, VALUE trace) { thread_add_trace_func(GET_EC(), rb_thread_ptr(obj), trace); return trace; }
返回目标线程的当前回溯。
static VALUE rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval) { return rb_vm_thread_backtrace(argc, argv, thval); }
返回目标线程的执行堆栈 — 一个包含回溯位置对象的数组。
有关更多信息,请参见 Thread::Backtrace::Location
。
此方法的行为类似于 Kernel#caller_locations
,只不过它适用于特定线程。
static VALUE rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval) { return rb_vm_thread_backtrace_locations(argc, argv, thval); }
返回给定键的纤程局部变量。如果找不到键,则有几种选择:如果没有其他参数,它将引发 KeyError
异常;如果给定了 default,则返回该值;如果指定了可选的代码块,则将运行该代码块并返回其结果。请参见 Thread#[]
和 Hash#fetch
。
static VALUE rb_thread_fetch(int argc, VALUE *argv, VALUE self) { VALUE key, val; ID id; rb_thread_t *target_th = rb_thread_ptr(self); int block_given; rb_check_arity(argc, 1, 2); key = argv[0]; block_given = rb_block_given_p(); if (block_given && argc == 2) { rb_warn("block supersedes default value argument"); } id = rb_check_id(&key); if (id == recursive_key) { return target_th->ec->local_storage_recursive_hash; } else if (id && target_th->ec->local_storage && rb_id_table_lookup(target_th->ec->local_storage, id, &val)) { return val; } else if (block_given) { return rb_yield(key); } else if (argc == 1) { rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key); } else { return argv[1]; } }
返回包含给定线程的 ThreadGroup
。
Thread.main.group #=> #<ThreadGroup:0x4029d914>
VALUE rb_thread_group(VALUE thread) { return rb_thread_ptr(thread)->thgroup; }
调用线程将暂停执行并运行此 thr
。
在 thr
退出或经过给定的 limit
秒之前不会返回。
如果时间限制到期,将返回 nil
,否则返回 thr
。
当主程序退出时,任何未加入的线程都将被终止。
如果 thr
之前引发了异常,并且未设置 ::abort_on_exception
或 $DEBUG 标志(因此尚未处理异常),则此时将处理该异常。
a = Thread.new { print "a"; sleep(10); print "b"; print "c" } x = Thread.new { print "x"; Thread.pass; print "y"; print "z" } x.join # Let thread x finish, thread a will be killed on exit. #=> "axyz"
以下示例说明了 limit
参数。
y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }} puts "Waiting" until y.join(0.15)
这将产生
tick... Waiting tick... Waiting tick... tick...
static VALUE thread_join_m(int argc, VALUE *argv, VALUE self) { VALUE timeout = Qnil; rb_hrtime_t rel = 0, *limit = 0; if (rb_check_arity(argc, 0, 1)) { timeout = argv[0]; } // Convert the timeout eagerly, so it's always converted and deterministic /* * This supports INFINITY and negative values, so we can't use * rb_time_interval right now... */ if (NIL_P(timeout)) { /* unlimited */ } else if (FIXNUM_P(timeout)) { rel = rb_sec2hrtime(NUM2TIMET(timeout)); limit = &rel; } else { limit = double2hrtime(&rel, rb_num2dbl(timeout)); } return thread_join(rb_thread_ptr(self), timeout, limit); }
如果给定的字符串(或符号)作为纤程局部变量存在,则返回 true
。
me = Thread.current me[:oliver] = "a" me.key?(:oliver) #=> true me.key?(:stanley) #=> false
static VALUE rb_thread_key_p(VALUE self, VALUE key) { VALUE val; ID id = rb_check_id(&key); struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage; if (!id || local_storage == NULL) { return Qfalse; } return RBOOL(rb_id_table_lookup(local_storage, id, &val)); }
返回纤程局部变量名称的数组(作为符号)。
thr = Thread.new do Thread.current[:cat] = 'meow' Thread.current["dog"] = 'woof' end thr.join #=> #<Thread:0x401b3f10 dead> thr.keys #=> [:dog, :cat]
static VALUE rb_thread_keys(VALUE self) { struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage; VALUE ary = rb_ary_new(); if (local_storage) { rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary); } return ary; }
终止 thr
并计划运行另一个线程,返回已终止的 Thread
。如果这是主线程或最后一个线程,则退出进程。
VALUE rb_thread_kill(VALUE thread) { rb_thread_t *target_th = rb_thread_ptr(thread); if (target_th->to_kill || target_th->status == THREAD_KILLED) { return thread; } if (target_th == target_th->vm->ractor.main_thread) { rb_exit(EXIT_SUCCESS); } RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(target_th)); if (target_th == GET_THREAD()) { /* kill myself immediately */ rb_threadptr_to_kill(target_th); } else { threadptr_check_pending_interrupt_queue(target_th); rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED); rb_threadptr_interrupt(target_th); } return thread; }
显示线程的名称。
static VALUE rb_thread_getname(VALUE thread) { return rb_thread_ptr(thread)->name; }
将给定名称设置为 ruby 线程。在某些平台上,它可能会将名称设置为 pthread 和/或内核。
static VALUE rb_thread_setname(VALUE thread, VALUE name) { rb_thread_t *target_th = rb_thread_ptr(thread); if (!NIL_P(name)) { rb_encoding *enc; StringValueCStr(name); enc = rb_enc_get(name); if (!rb_enc_asciicompat(enc)) { rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)", rb_enc_name(enc)); } name = rb_str_new_frozen(name); } target_th->name = name; if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) { native_set_another_thread_name(target_th->nt->thread_id, name); } return name; }
返回 Ruby 线程使用的本机线程 ID。
ID 取决于操作系统。(不是 pthread_self(3) 返回的 POSIX 线程 ID)
-
在 Linux 上,它是 gettid(2) 返回的 TID。
-
在 macOS 上,它是 pthread_threadid_np(3) 返回的系统范围内唯一的线程整数 ID。
-
在 FreeBSD 上,它是 pthread_getthreadid_np(3) 返回的线程唯一整数 ID。
-
在 Windows 上,它是 GetThreadId() 返回的线程标识符。
-
在其他平台上,它会引发
NotImplementedError
。
注意:如果线程尚未关联或已与本机线程解除关联,则返回 nil。如果 Ruby 实现使用 M:N 线程模型,则 ID 可能会根据时序而更改。
static VALUE rb_thread_native_thread_id(VALUE thread) { rb_thread_t *target_th = rb_thread_ptr(thread); if (rb_threadptr_dead(target_th)) return Qnil; return native_thread_native_thread_id(target_th); }
返回目标线程的异步队列是否为空。
如果给出了 error
,则仅检查 error
类型的延迟事件。
有关更多信息,请参见 ::pending_interrupt?
。
static VALUE rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread) { rb_thread_t *target_th = rb_thread_ptr(target_thread); if (!target_th->pending_interrupt_queue) { return Qfalse; } if (rb_threadptr_pending_interrupt_empty_p(target_th)) { return Qfalse; } if (rb_check_arity(argc, 0, 1)) { VALUE err = argv[0]; if (!rb_obj_is_kind_of(err, rb_cModule)) { rb_raise(rb_eTypeError, "class or module required for rescue clause"); } return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err)); } else { return Qtrue; } }
返回 thr 的优先级。默认值继承自创建新线程的当前线程,或初始主线程的零;高优先级线程的运行频率将高于低优先级线程(但低优先级线程也可以运行)。
这只是 Ruby 线程调度器的提示。它可能会在某些平台上被忽略。
Thread.current.priority #=> 0
static VALUE rb_thread_priority(VALUE thread) { return INT2NUM(rb_thread_ptr(thread)->priority); }
将 thr 的优先级设置为 integer。高优先级线程的运行频率将高于低优先级线程(但低优先级线程也可以运行)。
这只是 Ruby 线程调度器的提示。它可能会在某些平台上被忽略。
count1 = count2 = 0 a = Thread.new do loop { count1 += 1 } end a.priority = -1 b = Thread.new do loop { count2 += 1 } end b.priority = -2 sleep 1 #=> 1 count1 #=> 622504 count2 #=> 5832
static VALUE rb_thread_priority_set(VALUE thread, VALUE prio) { rb_thread_t *target_th = rb_thread_ptr(thread); int priority; #if USE_NATIVE_THREAD_PRIORITY target_th->priority = NUM2INT(prio); native_thread_apply_priority(th); #else priority = NUM2INT(prio); if (priority > RUBY_THREAD_PRIORITY_MAX) { priority = RUBY_THREAD_PRIORITY_MAX; } else if (priority < RUBY_THREAD_PRIORITY_MIN) { priority = RUBY_THREAD_PRIORITY_MIN; } target_th->priority = (int8_t)priority; #endif return INT2NUM(target_th->priority); }
从给定线程引发异常。调用者不必是 thr
。有关更多信息,请参见 Kernel#raise
。
Thread.abort_on_exception = true a = Thread.new { sleep(200) } a.raise("Gotcha")
这将产生
prog.rb:3: Gotcha (RuntimeError) from prog.rb:2:in `initialize' from prog.rb:2:in `new' from prog.rb:2
static VALUE thread_raise_m(int argc, VALUE *argv, VALUE self) { rb_thread_t *target_th = rb_thread_ptr(self); const rb_thread_t *current_th = GET_THREAD(); threadptr_check_pending_interrupt_queue(target_th); rb_threadptr_raise(target_th, argc, argv); /* To perform Thread.current.raise as Kernel.raise */ if (current_th == target_th) { RUBY_VM_CHECK_INTS(target_th->ec); } return Qnil; }
返回此 thr
的线程本地“报告异常”条件的状态。
创建 Thread
时的默认值是全局标志 Thread.report_on_exception
的值。
另请参见 report_on_exception=
。
还有一个类级别方法可以为所有新线程设置此值,请参见 ::report_on_exception=
。
static VALUE rb_thread_report_exc(VALUE thread) { return RBOOL(rb_thread_ptr(thread)->report_on_exception); }
当设置为 true
时,如果异常终止了此 thr
,则会在 $stderr 上打印一条消息。有关详细信息,请参见 ::report_on_exception
。
另请参见 report_on_exception
。
还有一个类级别方法可以为所有新线程设置此值,请参见 ::report_on_exception=
。
static VALUE rb_thread_report_exc_set(VALUE thread, VALUE val) { rb_thread_ptr(thread)->report_on_exception = RTEST(val); return val; }
唤醒 thr
,使其有资格进行调度。
a = Thread.new { puts "a"; Thread.stop; puts "c" } sleep 0.1 while a.status!='sleep' puts "Got here" a.run a.join
这将产生
a Got here c
另请参见实例方法 wakeup
。
VALUE rb_thread_run(VALUE thread) { rb_thread_wakeup(thread); rb_thread_schedule(); return thread; }
在 thr 上建立 proc 作为跟踪的处理程序,如果参数为 nil
,则禁用跟踪。
static VALUE thread_set_trace_func_m(VALUE target_thread, VALUE trace) { rb_execution_context_t *ec = GET_EC(); rb_thread_t *target_th = rb_thread_ptr(target_thread); rb_threadptr_remove_event_hook(ec, target_th, call_trace_func, Qundef); if (NIL_P(trace)) { return Qnil; } else { thread_add_trace_func(ec, target_th, trace); return trace; } }
返回 thr
的状态。
"sleep"
-
如果此线程正在休眠或等待 I/O,则返回
"run"
-
当此线程正在执行时
"aborting"
-
如果此线程正在中止
false
-
当此线程正常终止时
nil
-
如果因异常终止。
a = Thread.new { raise("die now") } b = Thread.new { Thread.stop } c = Thread.new { Thread.exit } d = Thread.new { sleep } d.kill #=> #<Thread:0x401b3678 aborting> a.status #=> nil b.status #=> "sleep" c.status #=> false d.status #=> "aborting" Thread.current.status #=> "run"
static VALUE rb_thread_status(VALUE thread) { rb_thread_t *target_th = rb_thread_ptr(thread); if (rb_threadptr_dead(target_th)) { if (!NIL_P(target_th->ec->errinfo) && !FIXNUM_P(target_th->ec->errinfo)) { return Qnil; } else { return Qfalse; } } else { return rb_str_new2(thread_status_name(target_th, FALSE)); } }
如果 thr
已死亡或正在休眠,则返回 true
。
a = Thread.new { Thread.stop } b = Thread.current a.stop? #=> true b.stop? #=> false
static VALUE rb_thread_stop_p(VALUE thread) { rb_thread_t *th = rb_thread_ptr(thread); if (rb_threadptr_dead(th)) { return Qtrue; } return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER); }
如果给定的字符串(或符号)作为线程局部变量存在,则返回 true
。
me = Thread.current me.thread_variable_set(:oliver, "a") me.thread_variable?(:oliver) #=> true me.thread_variable?(:stanley) #=> false
请注意,这些不是纤程局部变量。有关更多详细信息,请参见 Thread#[]
和 Thread#thread_variable_get
。
static VALUE rb_thread_variable_p(VALUE thread, VALUE key) { VALUE locals; VALUE symbol = rb_to_symbol(key); if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { return Qfalse; } locals = rb_thread_local_storage(thread); return RBOOL(rb_hash_lookup(locals, symbol) != Qnil); }
返回已设置的线程局部变量的值。请注意,这些值与纤程局部值不同。对于纤程局部值,请参见 Thread#[]
和 Thread#[]=
。
Thread
局部值随线程一起携带,并且不考虑纤程。例如
Thread.new { Thread.current.thread_variable_set("foo", "bar") # set a thread local Thread.current["foo"] = "bar" # set a fiber local Fiber.new { Fiber.yield [ Thread.current.thread_variable_get("foo"), # get the thread local Thread.current["foo"], # get the fiber local ] }.resume }.join.value # => ['bar', nil]
为线程局部返回的值为“bar”,而为纤程局部返回的值为 nil。纤程在同一线程中执行,因此线程局部值可用。
static VALUE rb_thread_variable_get(VALUE thread, VALUE key) { VALUE locals; VALUE symbol = rb_to_symbol(key); if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { return Qnil; } locals = rb_thread_local_storage(thread); return rb_hash_aref(locals, symbol); }
将具有 key
的线程本地值设置为 value
。请注意,这些值是线程本地的,而不是纤程本地的。有关更多信息,请参见 Thread#thread_variable_get
和 Thread#[]
。
static VALUE rb_thread_variable_set(VALUE thread, VALUE key, VALUE val) { VALUE locals; if (OBJ_FROZEN(thread)) { rb_frozen_error_raise(thread, "can't modify frozen thread locals"); } locals = rb_thread_local_storage(thread); return rb_hash_aset(locals, rb_to_symbol(key), val); }
返回线程局部变量名称的数组(作为符号)。
thr = Thread.new do Thread.current.thread_variable_set(:cat, 'meow') Thread.current.thread_variable_set("dog", 'woof') end thr.join #=> #<Thread:0x401b3f10 dead> thr.thread_variables #=> [:dog, :cat]
请注意,这些不是纤程局部变量。有关更多详细信息,请参见 Thread#[]
和 Thread#thread_variable_get
。
static VALUE rb_thread_variables(VALUE thread) { VALUE locals; VALUE ary; ary = rb_ary_new(); if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { return ary; } locals = rb_thread_local_storage(thread); rb_hash_foreach(locals, keys_i, ary); return ary; }
将 thr 的名称、ID 和状态转储到一个字符串。
static VALUE rb_thread_to_s(VALUE thread) { VALUE cname = rb_class_path(rb_obj_class(thread)); rb_thread_t *target_th = rb_thread_ptr(thread); const char *status; VALUE str, loc; status = thread_status_name(target_th, TRUE); str = rb_sprintf("#<%"PRIsVALUE":%p", cname, (void *)thread); if (!NIL_P(target_th->name)) { rb_str_catf(str, "@%"PRIsVALUE, target_th->name); } if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) { rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE, RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1)); } rb_str_catf(str, " %s>", status); return str; }
使用 join
等待 thr
完成,并返回其值或引发终止线程的异常。
a = Thread.new { 2 + 2 } a.value #=> 4 b = Thread.new { raise 'something went wrong' } b.value #=> RuntimeError: something went wrong
static VALUE thread_value(VALUE self) { rb_thread_t *th = rb_thread_ptr(self); thread_join(th, Qnil, 0); if (UNDEF_P(th->value)) { // If the thread is dead because we forked th->value is still Qundef. return Qnil; } return th->value; }
将给定线程标记为有资格进行调度,但是它可能仍然被 I/O 阻塞。
注意:这不会调用调度程序,有关更多信息,请参见 run
。
c = Thread.new { Thread.stop; puts "hey!" } sleep 0.1 while c.status!='sleep' c.wakeup c.join #=> "hey!"
VALUE rb_thread_wakeup(VALUE thread) { if (!RTEST(rb_thread_wakeup_alive(thread))) { rb_raise(rb_eThreadError, "killed thread"); } return thread; }