なぜ書いたのか
Elixir
においてプロセスの死活管理に用いるsupervisor
について学習を進めていたが、どれもGenServer
、すなわちElixir
のbehavior
を用いたサンプルばかりだった。また、そのプロジェクトはmix new --sup project_name
として生成されるもので、後から「supervisor
入れたい」となる事が割とあったが自前でmix
プロジェクトにsupervisor
を定義する方法についてよく分かっていなかった。
気軽に自身で作成したElixir
のbehavior
を用いないモジュールに対してsupervisor
を定義するための手順がまとまった日本語記事が確認出来なかったので自身のためにこの記事を手順のまとめとして作成した。
今回扱うサンプルについて
プロセスの起動時に特定のメッセージをメインのプロセスに指定回数だけ送信し、上限に達した際にプロセスをkillするという簡単なものを扱う。せっかくなので、message passing
が扱えるものを採用してみた。参考にした記事ではアキュムレーターをデクリメントしてカウントが0になったら終了するというシンプルなものだったので少し拡張させた。
プロセスの関係性は以下のようになる。
プロジェクトの作成
コードの全体はこちら。
github.com
今回はmix
を使わずに、プレーンに作っていくため、まずは適当にディレクトリを作る。
$ mkdir supervisor_sample
$ cd supervisor_sample
次にexファイルを用意。ファイル分割を考えたが手間になるため、今回は1ファイルに全てを記述する。
$ touch supervisor_sample/supervisor_sample.ex
各プロセスにて実行するモジュール内関数
関数名はsupervisor
の公式のサンプルにあるような形式に乗っ取ってそれぞれ、start_link
とinit
としておくが現在の推奨バージョンでは、特に関数名の縛りはないため自由な命名をすることが出来る。参考にした記事の実装は現在は非推奨 となっているため、一部を書き換えている。
defmodule Child do @moduledoc """ supervisorから起動されるタスクを実行するプロセス 上限回数まで親プロセスにメッセージを送信して、上限に達したら終了してkillされる """ @doc """ supervisorから実行される関数。 supervisorからlinkされたプロセスの生成を行う """ def start_link(receiver, max_send) do # launch process link to supervisor process pid = spawn_link(__MODULE__, :init, [receiver, max_send]) {:ok, pid} end def init(receiver, max_send) do # set seed for each process IO.puts("Start child with pid #{inspect(self())}") Process.sleep(200) end end
これでsupervisor
からこのモジュールが呼び出されてタスクを実行するプロセスが立ち上がるようになった。spawn_link
を使っているのはプロセスが死んだことをsupervisor
に通知するため。
しかし、このままではプロセスが立ち上がって何もせずに終了してしまうため、init
に親プロセスに対してメッセージを送信するための処理を追加する。指定回数まで実行させたいので新たな関数を定義する。
defmodule Child do def init(receiver, max_send) do # set seed for each process IO.puts("Start child with pid #{inspect(self())}") Process.sleep(200) sender(receiver, max_send) end @doc """ 上限回数まで再帰的にメッセージを親プロセスに送信する関数 """ def sender(_, 0), do: :ok def sender(receiver, max_send) do send(receiver, {:MESSAGE, "hello! from #{inspect(self())}"}) sender(receiver, max_send-1) end end
supervisor
の起動時に受け取った親プロセスのPID
を元に{:MESSAGE, *本文}
を送信する。再帰的に実行し、アキュムレーターの値が0になった時点で再帰を終了する。これでプロセスに実行させる処理の記述が完了した。
supervisorの定義
次に呼び出す元となる親プロセス、すなわちsupervisor
の定義をする。ただ単にsupervisor
のbehavior
に従って実装しただけになる。
defmodule Parent do @moduledoc """ supervisorの定義モジュール """ use Supervisor @doc """ supervisorの起動 """ def start_link(receiver, total_process) do Supervisor.start_link(__MODULE__, {receiver, total_process}, name: __MODULE__) end @doc """ supervisorの設定と戦略をまとめた関数 """ def init({receiver, total_process}) do children = Enum.map(1..total_process, fn n -> %{ id: "#{__MODULE__}_#{n}", # Childのstart_link関数に引数を渡して呼び出す。 start: {Child, :start_link, [receiver, total_process]}, restart: :permanent } end) # Aプロセスが死んだ時にAプロセスを復活させる -> 上限は10回(全プロセスで合算) Supervisor.init(children, strategy: :one_for_one, max_restarts: 10) end end
実装が必要なのはstart_link
とinit
と命名された2つの関数。start_link
はsupervisor
の起動のための関数で、init
はsupervisor
の設定、戦略をまとめた関数になる。start_link
に関しては特に手を加える部分はないが、第2引数に値を設定することでinit
関数に引数を渡すことが可能となる。
init
関数でsupervisor
で管理するプロセスの設定をしている。設定を定義したmap
を要素に持つリストを作成するか、Supervisor.child_spec/2
という関数を用いる。どんな設定が出来るのかは下記に記述されている。特に重要なのはstart:
の部分で、ここにsupervisor
から起動させたいモジュールと関数名、引数値を指定する。Task
やAgent
で指定する形式と同じなので特に困惑する部分はない。
Supervisor.init
ではsupervisor
の戦略を定義する。今回は特殊な設定はしておらず、死んだプロセスを1つ1つ都度、復活させる*one_for_one
、最大復活プロセス数は10とした。
これにて実行するプロセスとsupervisor
の設定が可能となった。
実行ファイル
最後に作成したsupervisor
を気軽に呼び出すため、送信されてきたメッセージを受信するための再帰receive
を実装したwrapperモジュールを用意する。
defmodule SupervisorSample do @moduledoc """ supervisorの呼び出しとメッセージの受信をサボるためのwrapperモジュール """ @doc """ supervisorの起動と再帰的メッセージ受信ループを実行 """ def launch(total_process) do # launch supervisor Parent.start_link(self(), total_process) receiver() end def receiver() do receive do {:MESSAGE, content} -> IO.puts("received message!: #{inspect(content)}") receiver() end end end
実行結果
iex
を立ち上げて、.ex
ファイルをコンパイルして実行する。
$ iex
iex(1)> c("supervisor_sample.ex") [Child, Parent, SupervisorSample]
今回は簡単のために立ち上げるプロセスは2つとして実行する。実行すると以下のようなlogが出力される。
iex(2)> SupervisorSample.launch(2) Start child with pid #PID<0.116.0> Start child with pid #PID<0.117.0> Start child with pid #PID<0.118.0> Start child with pid #PID<0.119.0> received message!: "hello! from #PID<0.116.0>" received message!: "hello! from #PID<0.116.0>" received message!: "hello! from #PID<0.117.0>" received message!: "hello! from #PID<0.117.0>" received message!: "hello! from #PID<0.119.0>" received message!: "hello! from #PID<0.119.0>" Start child with pid #PID<0.120.0> Start child with pid #PID<0.121.0> received message!: "hello! from #PID<0.118.0>" received message!: "hello! from #PID<0.118.0>" Start child with pid #PID<0.122.0> Start child with pid #PID<0.123.0> received message!: "hello! from #PID<0.120.0>" received message!: "hello! from #PID<0.120.0>" received message!: "hello! from #PID<0.121.0>" received message!: "hello! from #PID<0.121.0>" received message!: "hello! from #PID<0.122.0>" Start child with pid #PID<0.124.0> Start child with pid #PID<0.125.0> received message!: "hello! from #PID<0.122.0>" received message!: "hello! from #PID<0.123.0>" received message!: "hello! from #PID<0.123.0>" received message!: "hello! from #PID<0.124.0>" Start child with pid #PID<0.126.0> Start child with pid #PID<0.127.0> received message!: "hello! from #PID<0.124.0>" received message!: "hello! from #PID<0.125.0>" received message!: "hello! from #PID<0.125.0>" received message!: "hello! from #PID<0.126.0>" ** (EXIT from #PID<0.104.0>) shell process exited with reason: shutdown
こいつをElixir
で整形して何回プロセスが再起動されているかを確認してみると12
となっている。
log = """ Start child with pid #PID<0.116.0> : : received message!: "hello! from #PID<0.126.0>" """ log |> String.split("\n") |> Enum.filter(fn s -> String.starts_with?(s, "Start child with") end) |> length() |> IO.puts() # 12
これは最初に起動される2つのプロセスは当然ながら再起動のカウントに含まれていないということを意味している。どうやら上手くsupervisor
が動作しているようだ。