やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

Rubyにおける並行処理と使い分けについて

Rubyで並行処理をしたい場合、いくつか選択肢が考えられます。
自分がパッっと思いつくものを列挙しただけでも、これだけの選択肢が出てきました。

  • Thread
  • Process
  • Fiber
  • Async
  • Ractor
  • Concurrent Ruby

単一サーバーにおける並行処理では基本的にはスレッドかプロセスを使う方法が一般的かと思いますが、これだけ選択肢を出されてしまうと一体、何を使えば良いのか分からなくなってしまいます。事実、僕もRailsで並行処理をするようなケースでは毎回、調べては選定をしています。

自分のためにもそれぞれの特徴とどのようなケースで使用するのが良いかを簡単にまとめてみました。

Thread

組み込みライブラリとしてKernelモジュールに実装されているのがThreadです。
ドキュメントに記載のある通り、Threadクラスはシステムコールを経由してネイティブスレッドを呼び出します。
ネイティブスレッドである以上、OSや実行環境によって振る舞いが異なるかもしれません。

threads = (1..5).map do |n|
  Thread.new(n) do |nn|
    sleep 1
    puts ":::recieved #{nn}"
  end
end

threads.each { |t| t.join }
# :::recieved 4
# :::recieved 2
# :::recieved 5
# :::recieved 1
# :::recieved 3

またRubyではGVL(Giant VM lock)があるため、同時に実行されるネイティブスレッドは必ず1つになります。
「じゃあThreadを使う意味ないじゃん」と思われるかもしれませんが、IO待ちやブロッキング処理が発生した場合に、GVLが切り替わるため意味がないと結論づけるのは誤りです。

最も手軽に使えるのがThreadクラスです。
組み込みライブラリなのでアプリケーション全体のファイルサイズが小さくなるというのも嬉しいポイントです。

class Thread (Ruby 3.2 リファレンスマニュアル)

Process

ProcessもThread同様に組み込みライブラリの1つでモジュールとして定義されています。
公式ドキュメントに記載のあるようにシステムコールを通してOSのプロセスを利用することで並行処理を実現しています。明示的にOSのプロセスを使いたいケースではこちらを選択することになるでしょう。

processes = (1..5).each do |n|
  Process.fork do
    sleep 1
    puts ":::recieved #{n} from #{Process.pid()}"
    exit(true)
  end
end

適当なファイルにコードを書いて実行してみると、先ほどのThreadとは異なった結果が得られます。
Rubyプログラムの実行完了後に「:::recived...」の標準出力がされます。ドキュメントに記載の通りOSのプロセスを使用していることが分かります。pidの値も全て異なっていますね。

bashᕙ( ˙-˙ )ᕗ $ bundle exec ruby process_sample.rb
bashᕙ( ˙-˙ )ᕗ $ :::recieved 3 from 36977
:::recieved 1 from 36975
:::recieved 4 from 36978
:::recieved 5 from 36979
:::recieved 2 from 36976

module Process (Ruby 3.2 リファレンスマニュアル)

Fiber

Threadとは異なり、ユーザーレベルスレッド(OSが提供していないスレッド機構)を使用します。
有名どころでいうとGolangのgoroutineに近いものでしょうか。ドキュメントにはcoroutineと呼ばれるもの...という記載があります。というかRubyで軽量スレッドが使えるとは思っていなかったので驚きました。

今までFiberは一度も使った事がなかったです。
記事の執筆にあたってサンプルコードを書いてみた所感なのですが、コンテキストの切り替えをユーザーが管理しないといけないため、Fiberを使いこなせるようになるには修練が必要です。

fiber_a = Fiber.new do
  puts ':::puts from fiberA(1)'
  Fiber.yield(1)
  puts ':::puts from fiberA(2)'
  Fiber.yield(2)
  puts ':::puts from fiberA(3)'
  Fiber.yield(3)
end

fiber_b = Fiber.new do
  puts ':::puts from fiberB(1)'
  Fiber.yield(1)
  puts ':::puts from fiberB(2)'
  Fiber.yield(2)
  puts ':::puts from fiberB(3)'
  Fiber.yield(3)
end

fiber_a.resume() # :::puts from fiberA(1)
fiber_a.resume() # :::puts from fiberA(2)
fiber_b.resume() # :::puts from fiberB(1)
fiber_b.resume() # :::puts from fiberB(2)
fiber_a.resume() # :::puts from fiberA(3)
fiber_b.resume() # :::puts from fiberB(3)

最も基本的な使い方はyeildとresumeを使ってコンテキストを切り替える方法です。
上記では2つの軽量スレッドを作成し順にコンテキストが切り替わり標準出力がされていることが分かります。
使い方次第では色んなことができそうなFiberですが、先ほど書いたようにコンテキストの管理が難しいので、気軽に使えるものではないと感じました。

Fiberを使った簡易queueの実装

queue = Fiber.new do
  q = []
  v = nil

  loop do
    order = Fiber.yield(v)
    case order[:action]
    when :enqueue
      q.push(order[:value])
    when :dequeue
      v = q.shift
    end
  end
end

queue.resume()
queue.resume({ action: :enqueue, value: 1 })
queue.resume({ action: :enqueue, value: 2 })
queue.resume({ action: :enqueue, value: 3 })

puts queue.resume({ action: :dequeue }) # 1
puts queue.resume({ action: :dequeue }) # 2
puts queue.resume({ action: :dequeue }) # 3

Async

Asyncは組み込みライブラリではなくgemなので、インストールが必要です。
Asyncは内部でFiberを使用しています。簡単に言えばAsyncはFiberをいい感じに使うためにFiberのコンテキスト管理の実装を隠蔽してくれているラッパーです。

# 一部を抜粋: https://github.com/socketry/async/blob/main/lib/async/task.rb
def backtrace(*arguments)
  @fiber&.backtrace(*arguments)
end

def yield
  Fiber.scheduler.yield
end

基本的な使い方としてはKernalに拡張されたAsyncメソッドを呼び出して内部のブロックに処理を記述します。
Fiberほど複雑なことはできませんが「軽量スレッドで気軽に何か書きたい」時はAsyncが候補に上がってきそうです。

require 'async'

Async do |task|
  task.async do
    sleep 3
    puts ':::puts from 1'
  end
  task.async do
    sleep 2
    puts ':::puts from 2'
  end
  task.async do
    sleep 1
    puts ':::puts from 3'
  end
end

puts "Final Hello World!"

# :::puts from 3
# :::puts from 2
# :::puts from 1
# Final Hello World!

GitHub - socketry/async: An awesome asynchronous event-driven reactor for Ruby.

Ractor

Rubyでアクターモデル(Actor)を扱えるものがRuby3.0から導入されました。
現段階では、実験的な導入であり将来的に振る舞いが変わるかも...という旨の警告が表示されます。
個人的にはElixirでアクターモデルをガンガン触っていたので、非常に慣れ親しんだものがRubyに導入されたことが嬉しいですし、可能であるなら全ての並行処理をRactorで書いてしまいたいです。アクターモデルらしくお互いにメッセージを送受信する事ができます。

r2 = Ractor.new do
  b_in_ractor = receive
  puts "I am in B-Ractor! received=#{b_in_ractor}"
end

r1 = Ractor.new(r2) do |r2_address|
  a_in_ractor = receive
  puts "I am in A-Ractor! received=#{a_in_ractor}"
  r2_address.send('hello! from r2')
  r2_address.take
end

r1.send(1)
r1.take

# ractor_sample.rb:1: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
# I am in A-Ractor! received=1
# I am in B-Ractor! received=hello! from r2

現時点では試験的なものである以上、実戦導入は難しいですね。

class Ractor - RDoc Documentation

Concurrent Ruby

github.com

Concurrent RubyもAsync同様、gemなのでインストールが必要です。
Concurrent Rubyでは他言語で使用されている並行処理のモデルをRubyで実装するという非常に面白い試みをしており、一例をあげるとJavaScriptで採用されているPromiseなどが実装されています。
(※厳密には異なるものです)

数が多いためサンプルコードは記載しませんが、これまで紹介した方法ではカバーできない問題がある場合にはConcurrent Rubyが役立つかもしれません。自分はConcurrent::Promisesにお世話になっています。

まとめ

  • Thread: 組み込みライブラリで手軽に使える。ネイティブスレッドを使用
  • Process: 組み込みライブラリで手軽に使える。OSのプロセスを使用
  • Fiber: 組み込みライブラリで手軽に使える。軽量スレッドだがコンテキストの切り替えの管理が必要
  • Async: インストールが必要。Fiberのラッパーで気軽に軽量スレッドを扱う処理が書ける
  • Ractor: 組み込みライブラリで手軽に使える。Ruby3.0より導入されたアクターモデルを実装したもの
  • Concurrent Ruby: インストールが必要。さまざまな並行処理のモデルを実装したもの

自分だったら特に理由がなければThreadを第一候補にします。
その後、要件や問題の特性によって他の選択肢を考えるというのが良さそうです。
特にFiberは軽量スレッド...ということを初めて知ったので、今後、面白いことに使えそうです。

補足: 大文字はじまりの関数定義

「Rubyって大文字はじまりの関数を定義できるの?」と思って試してみたところ、ブロックを指定すれば可能でした...。 ただしブロックを指定しないとエラーになります。優先順位がブロックの方が高いのでしょうか。

def Sample(&block)
  print "hello "
  yield
end

Sample do
  puts "world!"
end
# hello world!

少しでも「ええな〜」と思ったらはてなスター・はてなブックマーク・シェアを頂けると励みになります。