田舎で並行処理の夢を見る

試したこと、需要がないかもしれないけど細々とアウトプットしてます

【Elixir/supervisor入門】通常のモジュールに対してsupervisorを定義する

なぜ書いたのか

Elixirにおいてプロセスの死活管理に用いるsupervisorについて学習を進めていたが、どれもGenServer、すなわちElixirbehaviorを用いたサンプルばかりだった。また、そのプロジェクトはmix new --sup project_nameとして生成されるもので、後から「supervisor入れたい」となる事が割とあったが自前でmixプロジェクトにsupervisorを定義する方法についてよく分かっていなかった。

気軽に自身で作成したElixirbehaviorを用いないモジュールに対してsupervisorを定義するための手順がまとまった日本語記事が確認出来なかったので自身のためにこの記事を手順のまとめとして作成した。

今回扱うサンプルについて

プロセスの起動時に特定のメッセージをメインのプロセスに指定回数だけ送信し、上限に達した際にプロセスをkillするという簡単なものを扱う。せっかくなので、message passingが扱えるものを採用してみた。参考にした記事ではアキュムレーターをデクリメントしてカウントが0になったら終了するというシンプルなものだったので少し拡張させた。

medium.com

プロセスの関係性は以下のようになる。
f:id:takamizawa46:20200418124309p:plain:w550

プロジェクトの作成

コードの全体はこちら。
github.com

今回はmixを使わずに、プレーンに作っていくため、まずは適当にディレクトリを作る。

$ mkdir supervisor_sample
$ cd supervisor_sample

次にexファイルを用意。ファイル分割を考えたが手間になるため、今回は1ファイルに全てを記述する。

$ touch supervisor_sample/supervisor_sample.ex

各プロセスにて実行するモジュール内関数

関数名はsupervisorの公式のサンプルにあるような形式に乗っ取ってそれぞれ、start_linkinitとしておくが現在の推奨バージョンでは、特に関数名の縛りはないため自由な命名をすることが出来る。参考にした記事の実装は現在は非推奨 となっているため、一部を書き換えている。

defmodule Child do
  @moduledoc """
    supervisorから起動されるタスクを実行するプロセス
    上限回数まで親プロセスにメッセージを送信して、上限に達したら終了してkillされる
  """

  @doc """
    supervisorから実行される関数。
    supervisorからlinkされたプロセスの生成を行う
  """
  def start_link(receiver, max_send) do
    # launch process link to supervisor process
    pid = spawn_link(__MODULE__, :init, [receiver, max_send])
    {:ok, pid}
  end

  def init(receiver, max_send) do
    # set seed for each process
    IO.puts("Start child with pid #{inspect(self())}")
    Process.sleep(200)
  end
end

これでsupervisorからこのモジュールが呼び出されてタスクを実行するプロセスが立ち上がるようになった。spawn_linkを使っているのはプロセスが死んだことをsupervisorに通知するため。
しかし、このままではプロセスが立ち上がって何もせずに終了してしまうため、initに親プロセスに対してメッセージを送信するための処理を追加する。指定回数まで実行させたいので新たな関数を定義する。

defmodule Child do
  def init(receiver, max_send) do
    # set seed for each process
    IO.puts("Start child with pid #{inspect(self())}")
    Process.sleep(200)
    sender(receiver, max_send)
  end

  @doc """
    上限回数まで再帰的にメッセージを親プロセスに送信する関数
  """
  def sender(_, 0), do: :ok
  def sender(receiver, max_send) do
    send(receiver, {:MESSAGE, "hello! from #{inspect(self())}"})
    sender(receiver, max_send-1)
  end
end

supervisorの起動時に受け取った親プロセスのPIDを元に{:MESSAGE, *本文}を送信する。再帰的に実行し、アキュムレーターの値が0になった時点で再帰を終了する。これでプロセスに実行させる処理の記述が完了した。

supervisorの定義

次に呼び出す元となる親プロセス、すなわちsupervisorの定義をする。ただ単にsupervisorbehaviorに従って実装しただけになる。

defmodule Parent do
  @moduledoc """
    supervisorの定義モジュール
  """
  use Supervisor

  @doc """
    supervisorの起動
  """
  def start_link(receiver, total_process) do
    Supervisor.start_link(__MODULE__, {receiver, total_process}, name: __MODULE__)
  end

  @doc """
    supervisorの設定と戦略をまとめた関数
  """
  def init({receiver, total_process}) do
    children = Enum.map(1..total_process, fn n ->
      %{
        id: "#{__MODULE__}_#{n}",
        # Childのstart_link関数に引数を渡して呼び出す。
        start: {Child, :start_link, [receiver, total_process]},
        restart: :permanent
      }
    end)

    # Aプロセスが死んだ時にAプロセスを復活させる -> 上限は10回(全プロセスで合算)
    Supervisor.init(children, strategy: :one_for_one, max_restarts: 10)
  end
end

実装が必要なのはstart_linkinit命名された2つの関数。start_linksupervisorの起動のための関数で、initsupervisorの設定、戦略をまとめた関数になる。start_linkに関しては特に手を加える部分はないが、第2引数に値を設定することでinit関数に引数を渡すことが可能となる。

init関数でsupervisorで管理するプロセスの設定をしている。設定を定義したmapを要素に持つリストを作成するか、Supervisor.child_spec/2という関数を用いる。どんな設定が出来るのかは下記に記述されている。特に重要なのはstart:の部分で、ここにsupervisorから起動させたいモジュールと関数名、引数値を指定する。TaskAgentで指定する形式と同じなので特に困惑する部分はない。

Supervisor.initではsupervisorの戦略を定義する。今回は特殊な設定はしておらず、死んだプロセスを1つ1つ都度、復活させる*one_for_one、最大復活プロセス数は10とした。

これにて実行するプロセスとsupervisorの設定が可能となった。

実行ファイル

最後に作成したsupervisorを気軽に呼び出すため、送信されてきたメッセージを受信するための再帰receiveを実装したwrapperモジュールを用意する。

defmodule SupervisorSample do
  @moduledoc """
    supervisorの呼び出しとメッセージの受信をサボるためのwrapperモジュール
  """

  @doc """
    supervisorの起動と再帰的メッセージ受信ループを実行
  """
  def launch(total_process) do
    # launch supervisor
    Parent.start_link(self(), total_process)
    receiver()
  end

  def receiver() do
    receive do
      {:MESSAGE, content} ->
        IO.puts("received message!: #{inspect(content)}")
        receiver()
    end
  end
end

実行結果

iexを立ち上げて、.exファイルをコンパイルして実行する。

$ iex

iex(1)> c("supervisor_sample.ex")  
[Child, Parent, SupervisorSample]  

今回は簡単のために立ち上げるプロセスは2つとして実行する。実行すると以下のようなlogが出力される。

iex(2)> SupervisorSample.launch(2)
Start child with pid #PID<0.116.0>
Start child with pid #PID<0.117.0>
Start child with pid #PID<0.118.0>
Start child with pid #PID<0.119.0>
received message!: "hello! from #PID<0.116.0>"
received message!: "hello! from #PID<0.116.0>"
received message!: "hello! from #PID<0.117.0>"
received message!: "hello! from #PID<0.117.0>"
received message!: "hello! from #PID<0.119.0>"
received message!: "hello! from #PID<0.119.0>"
Start child with pid #PID<0.120.0>
Start child with pid #PID<0.121.0>
received message!: "hello! from #PID<0.118.0>"
received message!: "hello! from #PID<0.118.0>"
Start child with pid #PID<0.122.0>
Start child with pid #PID<0.123.0>
received message!: "hello! from #PID<0.120.0>"
received message!: "hello! from #PID<0.120.0>"
received message!: "hello! from #PID<0.121.0>"
received message!: "hello! from #PID<0.121.0>"
received message!: "hello! from #PID<0.122.0>"
Start child with pid #PID<0.124.0>
Start child with pid #PID<0.125.0>
received message!: "hello! from #PID<0.122.0>"
received message!: "hello! from #PID<0.123.0>"
received message!: "hello! from #PID<0.123.0>"
received message!: "hello! from #PID<0.124.0>"
Start child with pid #PID<0.126.0>
Start child with pid #PID<0.127.0>
received message!: "hello! from #PID<0.124.0>"
received message!: "hello! from #PID<0.125.0>"
received message!: "hello! from #PID<0.125.0>"
received message!: "hello! from #PID<0.126.0>"
** (EXIT from #PID<0.104.0>) shell process exited with reason: shutdown

こいつをElixirで整形して何回プロセスが再起動されているかを確認してみると12となっている。

log =
"""
Start child with pid #PID<0.116.0>
:
:
received message!: "hello! from #PID<0.126.0>"
"""

log
|> String.split("\n")
|> Enum.filter(fn s -> String.starts_with?(s, "Start child with") end)
|> length()
|> IO.puts() # 12

これは最初に起動される2つのプロセスは当然ながら再起動のカウントに含まれていないということを意味している。どうやら上手くsupervisorが動作しているようだ。

github.com

参考文献