田舎で並行処理の夢を見る

試したこと、需要がないかもしれないけど細々とアウトプットしてます

【Elixir】無名関数で気軽に再帰的にメッセージの送受信を実装する

Elixirでの並行処理

Elixirでは並行処理を行うためにアクターモデルに従ってプロセス間同士でメッセージを送受(メッセージパッシング)し合います。
アクターモデルとは並行処理を効率よく実装するための手法の一つです。Elixirにおいては、アクター(軽量プロセス)はアクター(軽量プロセス)を生成することが出来ます。また、全てのアクターはメッセージボックスを持ち、それぞれがメッセージの送受信を行うことができます。

全ての値は暗黙的に変更されることはなく、メッセージの送受の結果としてだけ値の変更が許容されます。逆に言えば、メッセージの送受以外では値の変更はされることはありません。そのため、データ不整合の問題や、ロック(排他処理)を明示的に実装する必要がありません(任意の処理が終わるまで待つということはあるでしょう)。

ids = [1, 2, 3, 4]

=> message: push 5

ids = [1, 2, 3, 4, 5]

=> message: pop

ids = [1, 2, 3, 4]

Elixirでのメッセージパッシングの実装については過去に記事にしておりますので、合わせてご覧ください。

www.okb-shelf.work

今回は、メッセージパッシングを実装する上でメッセージを何度も受信して、処理を行うという再帰的なメッセージパッシングを無名関数でサクッと記述する方法についてまとめております。

通常の再帰的なメッセージ受信の記述

こんな感じで、気軽に実装しようとすると、わざわざmoduleを使って実装する必要があり、個人的にはボリューミーだなぁと思ってしまいます。とはいえ、可読性はかなり良いですね。素直にmoduleを使って実装するのが一番だなと改めて感じさせられました。

def receive_message([]), do: {:ok, []}
  def receive_message([h | t]) do
    receive do
      {:POP, _pid} ->
        IO.puts("[Info]: #{h} was popped")
        receive_message(t)
      _ -> exit(:safety)
  end
end

結果確認のため、簡単にプロセスの起動と、メッセージの送信部分を関数化してmoduleに内包しました。

defmodule Recursive.Reciver do
  def process do
    pid = Enum.map(1..10, &(&1)) |> spawn_receiver()
    Enum.each(1..10, fn _ ->
      :timer.sleep(1000)
      send(pid, {:POP, self()})
    end)
  end
  def spawn_receiver(lst) do
    spawn(fn -> receive_message(lst) end)
  end
  def receive_message([]), do: {:ok, []}
  def receive_message([h | t]) do
    receive do
      {:POP, _pid} ->
        IO.puts("[Info]: #{h} was popped")
        receive_message(t)
      _ -> exit(:safety)
    end
  end
end

実行結果

iex(5)> Recursive.Reciver.process
[Info]: 1 was popped
[Info]: 2 was popped
[Info]: 3 was popped
[Info]: 4 was popped
[Info]: 5 was popped
[Info]: 6 was popped
[Info]: 7 was popped
[Info]: 8 was popped
[Info]: 9 was popped
[Info]: 10 was popped
:ok

無名関数を使った再帰的なメッセージ受信の記述

以下の記事で紹介した無名関数の再起処理のテクニックを用いて、こんな感じで無名関数だけでmoduleを使わずに記述することが出来ました。

www.okb-shelf.work

(あれ、moduleの方がいいんじゃないか...??)
(もはやロマン)

receiver = fn lst ->
  recurcive_ = fn lst_, own ->
    receive do
      {:POP, _pid} ->
        [h | t] = lst_
        IO.puts("[Info]: #{h} was popped")
        own.(t, own)
      _ -> exit(:safety)
    end
  end
  recurcive_.(lst, recurcive_)
end

無名関数は自身のスコープを持っておらず、自己参照することは出来ないので、引数に自分自身を渡してあげます。こうすることで無名関数から自分自身を呼び出し再帰処理を行うことが可能になります。あとは先ほどと同じように再帰的なメッセージの受信の処理を記述してあげれば良いですね。

# 実行系
process = fn ->
  lst = Enum.map(1..10, &(&1))
  selecter_pid = spawn(fn -> receiver.(lst) end)
  Enum.each(1..10, fn _ ->
    :timer.sleep(1000)
    send(selecter_pid, {:POP, self()})
  end)
end

実行結果

iex(8)> process.()
[Info]: 1 was popped
[Info]: 2 was popped
[Info]: 3 was popped
[Info]: 4 was popped
[Info]: 5 was popped
[Info]: 6 was popped
[Info]: 7 was popped
[Info]: 8 was popped
[Info]: 9 was popped
[Info]: 10 was popped
:ok

メッセージの種類や、データの量が多くなってくるようであれば、素直にTaskAgentGenServerを使って実装した方が楽になっていきます。

参考文献