田舎で並行処理の夢を見る

試したこと、需要がないかもしれないけど細々とアウトプットしてます

【実装コード有り】銀行家のアルゴリズムの実装と検証

今更作った理由

たまたまgoogle scholarで並行処理に関する資料を眺めていたらダイクストラ法で有名なエドガー・ダイクストラさんの名前を見かけた。自分の中では数学者という認識をしていたが、実は分散コンピューティングの分野で、どのようにシステムの信頼性を保証するかという手法を最初に提唱したこともある、バリバリのコンピューター科学の精通者だった。当時、go to構文を用いてプログラミングを行なっていた従来の手法を改めて、現在、スタンダートになり、多くの人が使用するif文などを提唱した人物らしく驚いた。

ダイクストラさんに興味を持ち、彼の文献を漁っていたところ「銀行家のアルゴリズム」たるものを発見。デットロックを回避するためのアルゴリズムであり、自分の興味関心のある並行処理の分野に通ずるものがあったため、試しに実装しようと思った。

銀行家のアルゴリズムについて

詳しいことは参考にしたwikipediaの記事を参照。簡単にだけまとめておく。

  1. 全体の資源を保持しているメインプロセスは倉庫を持っており、大量のN種類の備品が用意されている。
利用できる資源:  
A B C D
3 1 1 2
  1. それぞれ、合計Pの各企業に試供品してメインプロセスの倉庫からN種類の備品がそれぞれいくつかレンタルの形で届く。
プロセス(および現在割り当て済みの資源):
   A B C D
P1 1 2 2 1
P2 1 0 3 3
P3 1 1 1 0
  1. あまりにも、試供品が素晴らしかったため、全ての企業から備品に関する問い合わせが届いた。メインプロセスには担当者が一人しかいないため、順番に問い合わせをさばいていく。

  2. P1社から問い合わせられた備品が倉庫に、それぞれ数があるかを確認してレンタル可能であれば、追加で備品を貸し出す。なお、各企業の契約状況によって貸し出せる備品の数は決まっている。

P1 がさらに Aを2、Bを1、Dを1 の資源を獲得し、最大値に到達する
- システムは A1、B0、C1、D1の資源を利用可能である
  1. P1社のレンタル期間が満了して、メインプロセスからレンタルしていた資源を返却する。
P1 終了し、Aを3 Bを3, Cを2、Dを2返却する
- システムはこの時点でA4、B3、C3、D3の資源を利用できる

この3 ~ 5の過程をPの分だけ繰り返す。もし問題なく全ての企業に備品を貸し出すことが出来たのであれば安全であると判断することが出来、どこかで備品が不足した際には安全ではないと判断する。

だいたい、こんな感じ

実装方法

もちろんElixirを使う。今回はElixir(Erlang/OTP)に用意されたGenServerを使ってmessage passingをサボってみることにする。

wikipediaにあるアルゴリズム擬似コードにあるP - プロセスの集合を真に受け取って、プロセスを1つ1つ立ち上げて、state serverに問い合わせるのが、インタリーブも複雑になり面倒だったので、プロセスは都度、立ち上げずに1つのプロセスから逐次依頼する形で実装する。
こうすればプロセスの処理の実行->終了->資源の解放の一連の処理を再現することも出来る。従って、wikipediaにあるforeach (p ∈ P)に近い形での実装になる。 おそらく、実務の場合では、複数のプロセスが逐次的ではなく並列的にリクエストを飛ばしてくるのでもっと複雑なものになる。今回は簡単のために上記を採用した。

実行したコード

全体像はgithubにpushしているのでこちらからどうぞ。

github.com

補足的にコードの説明をしておきます。

GenServerに用意したもの

銀行家のアルゴリズムを実装するにあたり、3つのAPIを用意した。どれも同期実行のhandle_callを使用している。(非同期はhandle_cast)

  • :init
  • :request
  • :state
  • :return

init

:initが使用された時に、GenServerにあるstateからリクエストにあった分だけを引き、次のstateとして使用する。

def handle_call({:init, %{"A" => _, "B" => _, "C" => _, "D" => _} = max_resource}, _from, state) do
  reply = Map.merge(state, max_resource, fn _, v1, v2 -> v1 - v2 end)
  {:reply, max_resource, reply}
end

今回は簡単のため、リクエストにはA, B, C, D以外のkeyのリクエストは受け付けないようにパターンマッチを使用している。

request

やっていることはほとんど:initと同じ。1つだけ異なるのは、差分更新をする際に値がマイナスになってしまったkeyが存在しているかを確認している。マイナスになるkeyがあるということは資源が不足している事を意味しており、この時にGenServer:unsafeを返す。

def handle_call({:request, %{"A" => _, "B" => _, "C" => _, "D" => _} = request}, _from, state) do
  reply = Map.merge(state, request, fn _, v1, v2 -> v1 - v2 end)
  {:reply, is_unsafe(Map.keys(state), reply), reply}
end

マイナスのkeyが存在しているかをstatekeysを元に再帰的に判定。

defp is_unsafe([], reply), do: {:ok, reply}
defp is_unsafe([head | tail], reply) do
  if Map.get(reply, head) >= 0 do
    is_unsafe(tail, reply)
  else
    {:unsafe, reply}
  end
end

state

現状のstateを返すだけで特にstateの更新は何もしていない。

def handle_call({:state}, _from, state) do
  {:reply, state, state}
end

return

プロセスの処理が終了した時に、貸し出していた資源を回収するためのもの。受け取った値をkey毎にstateに加えて、次のstateとする。

def handle_call({:return, %{"A" => _, "B" => _, "C" => _, "D" => _} = return}, _from, state) do
  merge_state = Map.merge(state, return, fn _, v1, v2 -> v1 + v2 end)
  {:reply, merge_state, merge_state}
end

BankersAlgo

ここからGenServerに対して、都度リクエストを飛ばして、資源の取得、更新を行う。処理は逐次処理にて実行して、順番はwikipediaにあった通りに作っただけ。ややこしいことをしている部分としては、一連の処理をシナリオとしてmapのデータ構造として定義しており、そのmapからイベントを実行していく。

呼び出されるのは以下の関数で、値の取得、GenServerの起動、シナリオの読み込みを行なっている。

def main(scenario_map) do
  # データ構造よりkeyを元に値を取得
  {init_state, init_each_resource, scenario} = get_values(scenario_map)
  # GenServerを起動
  {:ok, pid} = GenServer.start_link(Server, init_state)
  # Cpを再現するために、初回の資源割り振りを依頼
  Enum.each(init_each_resource, fn {_, req} -> GenServer.call(pid, {:init, req}) end)
  # 'scenario'(配列)を1つずつ読み込み、資源が不足しているかを確認する
  genserver_request(scenario, init_each_resource, pid)
end
defp genserver_request([], _init, _pid), do: :ok
defp genserver_request([scenario | tail], init_each_resource, pid) do
  # シナリオにある'process'の値から、初期資源の量の値を取得する
  {process, init} = get_init_resource(scenario, init_each_resource)
  # シナリオに定義された最大資源の値を取得
  max = Map.get(scenario, "request")
  # リクエスト = 最大資源 - 初期資源
  request = Map.merge(max, init, fn _k, v1, v2 -> v1 - v2 end)
  case GenServer.call(pid, {:request, request}) do
    {:ok, _} ->
      # 持っている資源をGenServerに返す
      GenServer.call(pid, {:return, max})
      # 初期資源をGenServerに返したとして値をクリアする
      updated_init = update_init_resource(init_each_resource, process)
      genserver_request(tail, updated_init, pid)
    {:unsafe, _} -> :unsafe
  end
end

検証の実行

testファイルにサンプルがあります。そのテストを実行することで値の変遷を確認することが出来る。安全なケースと安全ではないケースを1つずつ用意してある。

$ mix test

Compiling 1 file (.ex)

10:51:09.540 [info]  Set init state: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6}

10:51:09.542 [info]  Available resource: %{"A" => 3, "B" => 0, "C" => 1, "D" => 2}

10:51:09.542 [info]  ---> process request: P1

10:51:09.542 [info]  Received request: %{"A" => 2, "B" => 1, "C" => 0, "D" => 1}

10:51:09.542 [info]  Updated sever state: %{"A" => 1, "B" => -1, "C" => 1, "D" => 1}
.
10:51:10.543 [info]  Set init state: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6}

10:51:10.543 [info]  Available resource: %{"A" => 3, "B" => 1, "C" => 1, "D" => 2}

10:51:10.543 [info]  ---> process request: P1

10:51:10.543 [info]  Received request: %{"A" => 2, "B" => 1, "C" => 0, "D" => 1}

10:51:10.543 [info]  Updated sever state: %{"A" => 1, "B" => 0, "C" => 1, "D" => 1}

10:51:10.543 [info]  Return resource: %{"A" => 3, "B" => 3, "C" => 2, "D" => 2}

10:51:10.543 [info]  Received resource and merge: %{"A" => 4, "B" => 3, "C" => 3, "D" => 3}

10:51:10.543 [info]  ---> process request: P2

10:51:10.543 [info]  Received request: %{"A" => 0, "B" => 2, "C" => 0, "D" => 1}

10:51:10.543 [info]  Updated sever state: %{"A" => 4, "B" => 1, "C" => 3, "D" => 2}

10:51:10.543 [info]  Return resource: %{"A" => 1, "B" => 2, "C" => 3, "D" => 4}

10:51:10.544 [info]  Received resource and merge: %{"A" => 5, "B" => 3, "C" => 6, "D" => 6}

10:51:10.544 [info]  ---> process request: P3

10:51:10.544 [info]  Received request: %{"A" => 0, "B" => 0, "C" => 4, "D" => 0}

10:51:10.544 [info]  Updated sever state: %{"A" => 5, "B" => 3, "C" => 2, "D" => 6}

10:51:10.544 [info]  Return resource: %{"A" => 1, "B" => 1, "C" => 5, "D" => 0}

10:51:10.544 [info]  Received resource and merge: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6}
.

Finished in 2.1 seconds
2 tests, 0 failures

Randomized with seed 378761

やりたいことはできたっぽい。

参考文献