今更作った理由
たまたまgoogle scholar
で並行処理に関する資料を眺めていたらダイクストラ法
で有名なエドガー・ダイクストラ
さんの名前を見かけた。自分の中では数学者という認識をしていたが、実は分散コンピューティングの分野で、どのようにシステムの信頼性を保証するかという手法を最初に提唱したこともある、バリバリのコンピューター科学の精通者だった。当時、go to
構文を用いてプログラミングを行なっていた従来の手法を改めて、現在、スタンダートになり、多くの人が使用するif文
などを提唱した人物らしく驚いた。
ダイクストラさんに興味を持ち、彼の文献を漁っていたところ「銀行家のアルゴリズム」たるものを発見。デットロックを回避するためのアルゴリズムであり、自分の興味関心のある並行処理の分野に通ずるものがあったため、試しに実装しようと思った。
銀行家のアルゴリズムについて
詳しいことは参考にしたwikipediaの記事を参照。簡単にだけまとめておく。
- 全体の資源を保持しているメインプロセスは倉庫を持っており、大量のN種類の備品が用意されている。
利用できる資源: A B C D 3 1 1 2
- それぞれ、合計Pの各企業に試供品してメインプロセスの倉庫からN種類の備品がそれぞれいくつかレンタルの形で届く。
プロセス(および現在割り当て済みの資源): A B C D P1 1 2 2 1 P2 1 0 3 3 P3 1 1 1 0
あまりにも、試供品が素晴らしかったため、全ての企業から備品に関する問い合わせが届いた。メインプロセスには担当者が一人しかいないため、順番に問い合わせをさばいていく。
P1社から問い合わせられた備品が倉庫に、それぞれ数があるかを確認してレンタル可能であれば、追加で備品を貸し出す。なお、各企業の契約状況によって貸し出せる備品の数は決まっている。
P1 がさらに Aを2、Bを1、Dを1 の資源を獲得し、最大値に到達する - システムは A1、B0、C1、D1の資源を利用可能である
- P1社のレンタル期間が満了して、メインプロセスからレンタルしていた資源を返却する。
P1 終了し、Aを3 Bを3, Cを2、Dを2返却する - システムはこの時点でA4、B3、C3、D3の資源を利用できる
この3 ~ 5の過程をPの分だけ繰り返す。もし問題なく全ての企業に備品を貸し出すことが出来たのであれば安全であると判断することが出来、どこかで備品が不足した際には安全ではないと判断する。
だいたい、こんな感じ
実装方法
もちろんElixir
を使う。今回はElixir(Erlang/OTP)
に用意されたGenServer
を使ってmessage passing
をサボってみることにする。
wikipediaにあるアルゴリズムの擬似コードにあるP - プロセスの集合
を真に受け取って、プロセスを1つ1つ立ち上げて、state server
に問い合わせるのが、インタリーブも複雑になり面倒だったので、プロセスは都度、立ち上げずに1つのプロセスから逐次依頼する形で実装する。
こうすればプロセスの処理の実行->終了->資源の解放の一連の処理を再現することも出来る。従って、wikipediaにあるforeach (p ∈ P)
に近い形での実装になる。
おそらく、実務の場合では、複数のプロセスが逐次的ではなく並列的にリクエストを飛ばしてくるのでもっと複雑なものになる。今回は簡単のために上記を採用した。
実行したコード
全体像はgithubにpushしているのでこちらからどうぞ。
補足的にコードの説明をしておきます。
GenServerに用意したもの
銀行家のアルゴリズムを実装するにあたり、3つのAPIを用意した。どれも同期実行のhandle_call
を使用している。(非同期はhandle_cast
)
- :init
- :request
- :state
- :return
init
:init
が使用された時に、GenServer
にあるstate
からリクエストにあった分だけを引き、次のstate
として使用する。
def handle_call({:init, %{"A" => _, "B" => _, "C" => _, "D" => _} = max_resource}, _from, state) do reply = Map.merge(state, max_resource, fn _, v1, v2 -> v1 - v2 end) {:reply, max_resource, reply} end
今回は簡単のため、リクエストにはA, B, C, D
以外のkeyのリクエストは受け付けないようにパターンマッチを使用している。
request
やっていることはほとんど:init
と同じ。1つだけ異なるのは、差分更新をする際に値がマイナスになってしまったkeyが存在しているかを確認している。マイナスになるkeyがあるということは資源が不足している事を意味しており、この時にGenServer
は:unsafe
を返す。
def handle_call({:request, %{"A" => _, "B" => _, "C" => _, "D" => _} = request}, _from, state) do reply = Map.merge(state, request, fn _, v1, v2 -> v1 - v2 end) {:reply, is_unsafe(Map.keys(state), reply), reply} end
マイナスのkeyが存在しているかをstate
のkeys
を元に再帰的に判定。
defp is_unsafe([], reply), do: {:ok, reply} defp is_unsafe([head | tail], reply) do if Map.get(reply, head) >= 0 do is_unsafe(tail, reply) else {:unsafe, reply} end end
state
現状のstate
を返すだけで特にstate
の更新は何もしていない。
def handle_call({:state}, _from, state) do {:reply, state, state} end
return
プロセスの処理が終了した時に、貸し出していた資源を回収するためのもの。受け取った値をkey毎にstate
に加えて、次のstate
とする。
def handle_call({:return, %{"A" => _, "B" => _, "C" => _, "D" => _} = return}, _from, state) do merge_state = Map.merge(state, return, fn _, v1, v2 -> v1 + v2 end) {:reply, merge_state, merge_state} end
BankersAlgo
ここからGenServer
に対して、都度リクエストを飛ばして、資源の取得、更新を行う。処理は逐次処理にて実行して、順番はwikipediaにあった通りに作っただけ。ややこしいことをしている部分としては、一連の処理をシナリオとしてmap
のデータ構造として定義しており、そのmap
からイベントを実行していく。
呼び出されるのは以下の関数で、値の取得、GenServer
の起動、シナリオの読み込みを行なっている。
def main(scenario_map) do # データ構造よりkeyを元に値を取得 {init_state, init_each_resource, scenario} = get_values(scenario_map) # GenServerを起動 {:ok, pid} = GenServer.start_link(Server, init_state) # Cpを再現するために、初回の資源割り振りを依頼 Enum.each(init_each_resource, fn {_, req} -> GenServer.call(pid, {:init, req}) end) # 'scenario'(配列)を1つずつ読み込み、資源が不足しているかを確認する genserver_request(scenario, init_each_resource, pid) end
defp genserver_request([], _init, _pid), do: :ok defp genserver_request([scenario | tail], init_each_resource, pid) do # シナリオにある'process'の値から、初期資源の量の値を取得する {process, init} = get_init_resource(scenario, init_each_resource) # シナリオに定義された最大資源の値を取得 max = Map.get(scenario, "request") # リクエスト = 最大資源 - 初期資源 request = Map.merge(max, init, fn _k, v1, v2 -> v1 - v2 end) case GenServer.call(pid, {:request, request}) do {:ok, _} -> # 持っている資源をGenServerに返す GenServer.call(pid, {:return, max}) # 初期資源をGenServerに返したとして値をクリアする updated_init = update_init_resource(init_each_resource, process) genserver_request(tail, updated_init, pid) {:unsafe, _} -> :unsafe end end
検証の実行
testファイルにサンプルがあります。そのテストを実行することで値の変遷を確認することが出来る。安全なケースと安全ではないケースを1つずつ用意してある。
$ mix test
Compiling 1 file (.ex) 10:51:09.540 [info] Set init state: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6} 10:51:09.542 [info] Available resource: %{"A" => 3, "B" => 0, "C" => 1, "D" => 2} 10:51:09.542 [info] ---> process request: P1 10:51:09.542 [info] Received request: %{"A" => 2, "B" => 1, "C" => 0, "D" => 1} 10:51:09.542 [info] Updated sever state: %{"A" => 1, "B" => -1, "C" => 1, "D" => 1} . 10:51:10.543 [info] Set init state: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6} 10:51:10.543 [info] Available resource: %{"A" => 3, "B" => 1, "C" => 1, "D" => 2} 10:51:10.543 [info] ---> process request: P1 10:51:10.543 [info] Received request: %{"A" => 2, "B" => 1, "C" => 0, "D" => 1} 10:51:10.543 [info] Updated sever state: %{"A" => 1, "B" => 0, "C" => 1, "D" => 1} 10:51:10.543 [info] Return resource: %{"A" => 3, "B" => 3, "C" => 2, "D" => 2} 10:51:10.543 [info] Received resource and merge: %{"A" => 4, "B" => 3, "C" => 3, "D" => 3} 10:51:10.543 [info] ---> process request: P2 10:51:10.543 [info] Received request: %{"A" => 0, "B" => 2, "C" => 0, "D" => 1} 10:51:10.543 [info] Updated sever state: %{"A" => 4, "B" => 1, "C" => 3, "D" => 2} 10:51:10.543 [info] Return resource: %{"A" => 1, "B" => 2, "C" => 3, "D" => 4} 10:51:10.544 [info] Received resource and merge: %{"A" => 5, "B" => 3, "C" => 6, "D" => 6} 10:51:10.544 [info] ---> process request: P3 10:51:10.544 [info] Received request: %{"A" => 0, "B" => 0, "C" => 4, "D" => 0} 10:51:10.544 [info] Updated sever state: %{"A" => 5, "B" => 3, "C" => 2, "D" => 6} 10:51:10.544 [info] Return resource: %{"A" => 1, "B" => 1, "C" => 5, "D" => 0} 10:51:10.544 [info] Received resource and merge: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6} . Finished in 2.1 seconds 2 tests, 0 failures Randomized with seed 378761
やりたいことはできたっぽい。