【Ruby】2スレッドの値交換。並列イテレーター内でスレッド生成(居眠りなど)。
2つのスレッドが値を交換
require 'thread' class Exchanger def initialize @first_value = @second_value = nil @lock = Mutex.new @first = Mutex.new @second = ConditionVariable.new end def exchange(value) @lock.synchronize do if @first.try_lock @first_value = value @second.wait(@lock) #<= 2番目のスレッドを待つ @first.unlock @second_value else @second_value = value @second.signal @first_value end end end end e = Exchanger.new Thread.new do value = e.exchange("no1") p value + "dayo" end Thread.new do sleep(3) value = e.exchange("no2") p value + "dayodayo" end loop do end
■実行結果
"no1dayodayo" "no2dayo"
へー。
複数回実行する
以前に勉強した
並列イテレーターや
他スレッドの終了を待つメソッドを加えて、
詳しく動きを理解してみよう。
通常のイテレータ、居眠りあり(並列なし)
require 'thread' class Exchanger def initialize @first_value = @second_value = nil @lock = Mutex.new @first = Mutex.new @second = ConditionVariable.new end def exchange(value) @lock.synchronize do if @first.try_lock @first_value = value @second.wait(@lock) #<= 2番目のスレッドを待つ @first.unlock @second_value else @second_value = value @second.signal @first_value end end end end module Enumerable def concurrently(&block) threads = [] self.each_with_index do |item, i| threads << Thread.new{block.call(item, i)} end threads.map{|t| t.join} end end Thread.abort_on_exception = true _hash = {a: "aa", b: "bb",c: "cc", d: "dd", e: "ee"} _hash.each_with_index do |(key, val), i| #<= ループA e = Exchanger.new Thread.new do value = e.exchange(key) p value.to_s + " #{i}" #<= (p1) end Thread.new do sleep(1) #<= イ value = e.exchange(val) p value.to_s + " #{i} dayo" #<= (p2) end sleep(1) #<= ア p "#{i} time end" #<= (p3) end loop do end
■実行結果
"0 time end" "a 0 dayo" "aa 0" "1 time end" "b 1 dayo" "bb 1" "2 time end" "c 2 dayo" "cc 2" "3 time end" "d 3 dayo" "dd 3" "4 time end" "e 4 dayo" "ee 4"
■考察
図で書くとこういう感じ
(ピンクはイテレーターの各要素の動き)
通常のイテレーター(居眠りなし)
さっきの
sleep(1) #<= ア
を消してみると?
■実行結果
"0 time end" "1 time end" "2 time end" "3 time end" "4 time end" "b 1 dayo" "a 0 dayo" "bb 1" "c 2 dayo" "cc 2" "e 4 dayo" "d 3 dayo" "aa 0" "ee 4" "dd 3"
■考察
こういうイメージ
並列イテレーター(居眠りあり)
関係のあるコードだけ、、
module Enumerable def concurrently(&block) threads = [] self.each_with_index do |item, i| threads << Thread.new{block.call(item, i)} end threads.map{|t| t.join} end end Thread.abort_on_exception = true _hash = {a: "aa", b: "bb",c: "cc", d: "dd", e: "ee"} _hash.concurrently do |(key, val), i| #<= ループA e = Exchanger.new Thread.new do value = e.exchange(key) p value.to_s + " #{i}" #<= (p1) end Thread.new do sleep(1) #<= イ value = e.exchange(val) p value.to_s + " #{i} dayo" #<= (p2) end sleep(1) #<= ア p "#{i} time end" #<= (p3) end
■実行結果
"4 time end" "0 time end" "1 time end" "2 time end" "d 3 dayo" "3 time end" "e 4 dayo" "b 1 dayo" "bb 1" "a 0 dayo" "c 2 dayo" "dd 3" "ee 4" "aa 0" "cc 2"
■考察
並列なので、居眠りしていても、他の要素には関係がない
イテレーターの各要素でスレッド終了を待つ
def join_all main = Thread.main current = Thread.current all = Thread.list all.each{|t| t.join unless t==current || t==main} end Thread.abort_on_exception = true _hash = {a: "aa", b: "bb",c: "cc", d: "dd", e: "ee"} _hash.each_with_index do |(key, val), i| #<= ループA e = Exchanger.new Thread.new do value = e.exchange(key) p value.to_s + " #{i}" #<= (p1) end Thread.new do sleep(1) #<= イ value = e.exchange(val) p value.to_s + " #{i} dayo" #<= (p2) end join_all p "#{i} time end" #<= (p3) end loop do end
■実行結果
"a 0 dayo" "aa 0" "0 time end" "b 1 dayo" "bb 1" "1 time end" "c 2 dayo" "cc 2" "2 time end" "d 3 dayo" "dd 3" "3 time end" "e 4 dayo" "ee 4" "4 time end"
■考察
こういう感じ。
ちょっとスレッドが分かった