Elixirでの並行処理
Elixir
では並行処理を行うためにアクターモデルに従ってプロセス間同士でメッセージを送受(メッセージパッシング)し合います。
アクターモデルとは並行処理を効率よく実装するための手法の一つです。Elixir
においては、アクター(軽量プロセス)はアクター(軽量プロセス)を生成することが出来ます。また、全てのアクターはメッセージボックスを持ち、それぞれがメッセージの送受信を行うことができます。
全ての値は暗黙的に変更されることはなく、メッセージの送受の結果としてだけ値の変更が許容されます。逆に言えば、メッセージの送受以外では値の変更はされることはありません。そのため、データ不整合の問題や、ロック(排他処理)を明示的に実装する必要がありません(任意の処理が終わるまで待つということはあるでしょう)。
ids = [1, 2, 3, 4] => message: push 5 ids = [1, 2, 3, 4, 5] => message: pop ids = [1, 2, 3, 4]
Elixir
でのメッセージパッシングの実装については過去に記事にしておりますので、合わせてご覧ください。
今回は、メッセージパッシングを実装する上でメッセージを何度も受信して、処理を行うという再帰的なメッセージパッシングを無名関数でサクッと記述する方法についてまとめております。
通常の再帰的なメッセージ受信の記述
こんな感じで、気軽に実装しようとすると、わざわざmodule
を使って実装する必要があり、個人的にはボリューミーだなぁと思ってしまいます。とはいえ、可読性はかなり良いですね。素直にmodule
を使って実装するのが一番だなと改めて感じさせられました。
def receive_message([]), do: {:ok, []} def receive_message([h | t]) do receive do {:POP, _pid} -> IO.puts("[Info]: #{h} was popped") receive_message(t) _ -> exit(:safety) end end
結果確認のため、簡単にプロセスの起動と、メッセージの送信部分を関数化してmodule
に内包しました。
defmodule Recursive.Reciver do def process do pid = Enum.map(1..10, &(&1)) |> spawn_receiver() Enum.each(1..10, fn _ -> :timer.sleep(1000) send(pid, {:POP, self()}) end) end def spawn_receiver(lst) do spawn(fn -> receive_message(lst) end) end def receive_message([]), do: {:ok, []} def receive_message([h | t]) do receive do {:POP, _pid} -> IO.puts("[Info]: #{h} was popped") receive_message(t) _ -> exit(:safety) end end end
実行結果
iex(5)> Recursive.Reciver.process [Info]: 1 was popped [Info]: 2 was popped [Info]: 3 was popped [Info]: 4 was popped [Info]: 5 was popped [Info]: 6 was popped [Info]: 7 was popped [Info]: 8 was popped [Info]: 9 was popped [Info]: 10 was popped :ok
無名関数を使った再帰的なメッセージ受信の記述
以下の記事で紹介した無名関数の再起処理のテクニックを用いて、こんな感じで無名関数だけでmodule
を使わずに記述することが出来ました。
(あれ、module
の方がいいんじゃないか...??)
(もはやロマン)
receiver = fn lst -> recurcive_ = fn lst_, own -> receive do {:POP, _pid} -> [h | t] = lst_ IO.puts("[Info]: #{h} was popped") own.(t, own) _ -> exit(:safety) end end recurcive_.(lst, recurcive_) end
無名関数は自身のスコープを持っておらず、自己参照することは出来ないので、引数に自分自身を渡してあげます。こうすることで無名関数から自分自身を呼び出し再帰処理を行うことが可能になります。あとは先ほどと同じように再帰的なメッセージの受信の処理を記述してあげれば良いですね。
# 実行系 process = fn -> lst = Enum.map(1..10, &(&1)) selecter_pid = spawn(fn -> receiver.(lst) end) Enum.each(1..10, fn _ -> :timer.sleep(1000) send(selecter_pid, {:POP, self()}) end) end
実行結果
iex(8)> process.() [Info]: 1 was popped [Info]: 2 was popped [Info]: 3 was popped [Info]: 4 was popped [Info]: 5 was popped [Info]: 6 was popped [Info]: 7 was popped [Info]: 8 was popped [Info]: 9 was popped [Info]: 10 was popped :ok
メッセージの種類や、データの量が多くなってくるようであれば、素直にTask
やAgent
、GenServer
を使って実装した方が楽になっていきます。