やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

【作って学ぶフロントエンド】ReactのuseStateの仕組みについて

Hooksについて

Reactのバージョン16.8に導入されたHooksという機能によって、class componentを使用せずとも、functional component(以降: FCと記述)にstateを用意することが可能になりました。

FCstateを用意するのは非常に簡単で以下の構文を記述するのみです。(公式のサンプルから引用)

import React, { useState } from 'react';

function Example() {
  // Declare a new state variable, which we'll call "count"
  const [count, setCount] = useState(0);

  return (
    <div>
      <p>You clicked {count} times</p>
      <button onClick={() => setCount(count + 1)}>
        Click me
      </button>
    </div>
  );
}

reactjs.org

useStateの戻りは配列で2つの要素を持っています。引数にはstateとして記録したい値の初期値を渡して上げれば良いようです。上記のサンプルでは0という値をセットしています。

useState(0);

userStateの戻り値は2つの要素を持つ配列です。1つ目の要素は実際にstateに記録した値です。上記のコードでは0countという変数に保持されています。そして2つ目の値が後に詳しく解説しますが、closureと呼ばれるstateを更新するための関数です。この関数に次のstateとして記録したい値を引数に渡して関数を実行することでstateの更新がされます。

...

当たり前のようにstateが用意出来て、更新も出来てしまっていますが、どんな仕組みになっているか気になりませんか?
どのようにFCが内部にstateを記録しているかという話を実際にコードを記述して解説しようと思います。コード量の少なさにきっと驚くことでしょう。
この仕組みを理解するために、まずは関数型プログラミングについて簡単に紹介します。

(このツイートが好評だったのでこの記事を書きました)

関数型プログラミングで出来ること

関数型プログラミングの全体像を説明すると非常に長くなってしまうので、useStateを理解するために必要な最低限の概念のみをご紹介します。関数型プログラミングでは以下の操作が可能になります。

変数に関数を代入する

// 変数への関数の代入
const double = x => {
    return x * 2;
};

// 変数には関数が代入されている
console.log(typeof(double)); // function
console.log(double(10)); // 20

関数の引数に関数を渡す

// 関数を引数に渡す
const lst = [1, 2, 3, 4, 5];
const apply = lst.map(num => num * 2);
console.log(apply); // [2, 4, 6, 8, 10]

map関数の第1引数に渡されているのはnum => num * 2という無名関数です。この引数に渡した関数を配列の全ての要素に適合(apply)させるのが、map関数の機能です。filterreduceなども同様に引数に関数を受け取ります。

関数の戻り値に関数を返す

カリー化と呼ばれる(Haskell Curryに敬意を表して命名された)方法を使って、足し合わせる値を固定して状態の関数(ここでは10)を返しています。こうすることで1つの関数から複数の機能を持つ関数を作成することが出来ます。

// 関数を戻り値として返す
const addN = N => num => num + N;

const add10 = addN(10);
console.log(typeof(add10)); // function
console.log(add10(3)); // 13


const add20 = addN(20);
console.log(add20(3)); // 23

うんうん、それで?

お気づきになりましたか。先ほどのサンプルコードの3つの「関数の戻り値に関数を返す」中で用意したadd10add20が内部にそれぞれ、10, 20という値を保持しているということに。10, 20の値はそれぞれ、addNを呼び出した初回実行時にのみ、代入した値です。それ以降は一度も引数経由で受け渡したり、環境変数などで更新は行っていません。

これがuseStateが内部で値(つまりはstate)を保持することが可能となる仕組みです。一度、この仕組みを利用してuseStateに初期値を受け渡す部分までを記述してみましょう。

useState version1.0の作成

まずは受け渡した値を内部のstateに記録する部分までを記述しました。

const useState = init => {
    const state = init;
    return state;
}

const count = useState(0);
console.log(count);

しかしながら、この方法では内部に値を保持しておくことが出来ずに、useStateを実行するたびに、別の値が用意されてしまいます。const countで用意した値(useState内のconst state)に対してアクセスする方法がないということを意味しています。
内部のstateにアクセスするための仕組みを用意します。先ほどの戻り値に関数を使用するという方法を使ってみましょう。

const useState = init => {
    const state = init;
    return () => {
        return state;
    }
}

const count = useState(0);
console.log(typeof(count)); // function
console.log(count()); // 0
console.log(count()); // 0

この方法ではuseStateの戻り値の関数を使うことで、useStateの内部に記録されているstateの値を覗き見(重要なのであえて覗き見という表現を採用)をすることが出来るようになりました。何回でも実行出来て、useState内部のstateが更新されることはありません。安心ですね。鍵のない取り出し不可能な金庫のようです。

...

しかし、困りました。本家useStateは戻り値の第2要素の関数を実行することで内部のstateを更新することが出来ました。次は、更新するための仕組みを用意していきましょう。

closureについて

一言で説明すれば、関数内に用意されている値を取得、更新するための専用の関数のことです。先ほど実装したuseStateの戻り値が実はclosureになっています。

const useState = init => {
    const state = init;
    return () => {
        return state;
    }
}

これは値を取得するためのclosureです。値を取得する事は出来ますが、更新することは絶対に出来ません。絶対に。
今の自分たちに必要なのは関数内に用意した値(state)を更新するためのclosureです。

closureについて触れている過去記事もありますので、合わせてご参照下さい。

www.okb-shelf.work

www.okb-shelf.work

useState version2.0の作成

内部の値を更新するために値(state)を更新する関数を返すようにします。

const useState = init => {
    let state = init;
    return (val) => {
        state = val;
        return state;
    };
}

const count = useState(0);
console.log(typeof(count)); // function
console.log(count(1)); // 1
console.log(count(2)); // 2

まず内部のstateの宣言を値の更新が行われるため、constからletに変更しました。合わせて、戻り値の関数は引数経由で更新したい値を受け取り、内部のstateを更新するようにします。この妙義がなせるのはreturnで返している関数からはlet stateに対してスコープが有効であるためです。逆に言えば、戻り値として受け取れる関数以外ではstateの値を更新することが出来ないということです。

これでほぼ全ての機能の実装が完了したので、本家useStateに従って戻り値を配列にしておきます。

const useState = init => {
    let state = init;
    return [
        state,
        (val) => {
            state = val;
            return state;
        }
    ];
};

const [count, setCount] = useState(0);
console.log(count); // 0
console.log(typeof(setCount)); // function

console.log(setCount(1)); // 1
console.log(setCount(2)); // 2
console.log(setCount(3)); // 3
console.log(count); // 0

いい感じですね。本家useStateを再現出来ました!!

最後に

たったこれだけのコードで高機能な関数を用意できるのは関数型プログラミングの成せる技です。useStateについて記述した記事ですが私自身、関数型言語を好んで使っていて、そのデザインに日々、感動しています。
少しでも「関数型プログラミング凄そう」と思って頂けたのであれば、以下の記事も覗いて見てください。

www.okb-shelf.work

www.okb-shelf.work

参考文献

並行・並列と並行処理・並列処理の違いについてまとめた

毎回調べているのでまとめた

何気なく「並行処理にすると....」だとか「ここは並列処理にして...」という会話をすることが普段多いが、「並行と並列って何が違うのよ?」と前からよく思っていた。その度に、ググっては「あー。そういうことね」と理解するものの、脳内ストレージが揮発性メモリであるため寝て起きたら忘れてしまう。 そんな、明日の自分のためにも

  • 並行と並列
  • 並行処理と並列処理

の違いについてまとめたものを記しておく。
なお、情報の多くは「書籍: 並行コンピューティング技法」を参考にしている。あと付属的にwikipediaやブログ記事など。参考文献にまとめているので、気になる方は参照して下さい。

www.oreilly.co.jp

並行コンピューティング技法についてのまとめ記事を過去に書いているので良ければ合わせて参照下さい。

www.okb-shelf.work

並行と並列の違い

過去記事にも記述しているため、そちらから引用する。

www.okb-shelf.work

並行とは複数の動作を実行可能状態に保てる状態を備えていること
並列とは複数の動作を同時に実行できる場合のこと

影分身をして100人分、一気に修行する(N●RUTO)

  • 並行 -> 修行を開始して修行中の状態
  • 並列 -> 100人に分身が完了した状態


つまりは「並行」と「並列」という言葉は概念であり、コンピューティングに限った話ではない。実際に、並行作業・並行世界、並列回路...だとか様々な派生した単語が存在している。
また先ほどの影分身での修行からも考えることが出来るが、並行という概念は並列という概念を内包している。「並列 ∈ 並行」となっており、並行という概念が実行可能状態を備えているということは100人への分身が完了した直後の状態(並列の概念を内包している)を経過していると考えてみれば自然ではないだろうか。

このように並行と並列というのは割と広い意味をもった概念であり曖昧性が高く、混乱を生みやすい。それに対して並行処理、並列処理はコンピューティングに寄った意味を持っており、おそらく多くの人が知りたいのはそちらになるだろう。

補足: 並行と並列の別意味

ググっているとこういう説明をしているものも多く見受けられる。

  • 並行 -> 複数の現象が同時、あるいはほぼ同時に起こること
  • 並列 -> 複数の現象が同時に起こること

確かに。
以下に説明をしている並行処理と並列処理について理解をしている人が見れば、この説明は妥当だ。「並列 ∈ 並行」という関係性にもなっている。

並行処理と並列処理

並行処理と並列処理は大きく意味が異なる。詳しいことは専門の書籍やスレッディングライブラリのドキュメントを読んでもらればと思うので、抽象的な説明だけをしておく。

並行処理とは1つのコアで高速にコンテキストスイッチ(どのプロセスなりスレッドが実行権限を持つかという権限の切り替え)を切り替えながら、あたかも複数の処理が同時に処理されているように実行する処理である。つまり、実際には本当に同時に複数の処理は実行されていない。並行処理を行う目的は複数の処理を同時に(見えるように)実行して他の処理(多くの場合はメイン処理)をwaitさせないことが目的で、計算処理などの高速化を主にしない。

それに対して並列処理は1つのコアでは実行されずに、プロセッサに存在するマルチコアを利用して(最近はコア数が凄いことになっている)実際に同時に処理を実行している。つまり、本当に同時に複数の処理が実行されている。並行処理とは違い、並列処理では高速化を目的とするため、大規模な演算処理に用いるのが良いだろう。

合わせて、マルチコアでないと並列処理が実行できないというハードウェアの制約もあるため、いつでも並列処理が出来るというわけではない。

おまけ

非同期処理について

せっかくなので、よく引き合いに出されている非同期処理についても説明をしておく。非同期処理は並行処理か並列処理のどちらを利用しても再現可能であり、概念的には並行に所属することになる。

メインの処理の実行とは別にマルチコアなりスレッドなりプロセスなりを利用してサブの処理を実行させたのであれば、それは非同期処理として考えて良いだろう。

参考文献

【Elixirのサンプルコード有り】条件に一致した時に再帰を停止する方法4つを書き比べてみた

なんでbreak_reduceみたいなのはないんだろう

配列(リスト)に対して、各要素を精査する際に、特定の条件にマッチする要素が見つかった時点で処理を停止して、Elixirであれば{:exist, value}のような値を返す関数をよく作成することがある。頻出の処理パターンではあるが、ElixirEnumを覗いてみても該当する関数は見当たらない。

(ありそうでない)

# ※これは架空の関数です
Enum.break([1,2,3,4,5], {:ok, 0}, fn n, _ -> n == 3 end)

自分は関数プログラミングの専門家ではないが、これは関数プログラミングがデータの変換を行うことを行うことを重視しており、reduceという関数が、列挙可能な値を全て確認した上でデータ変換を行うという性質であると考えれば、reduce関数の内部でbreakをするというのはイケてないことをしようとしているのではないかと考えることが出来る。
合わせてEnumは列挙可能なデータ構造(enumerables)に対して、処理を適応させるような関数をまとめたモジュールなため、途中で処理が停止して、残りの要素には何もしないという実行方法はミスマッチなんだろうか。

... という仮説を元に、indexを指定して、その値を取得するEnum.atの実装を見てみた。どうやら普通に再帰関数を使って停止させているようだ。仮説は誤っていたのだろうか。

defmodule Enum do
  def at(enumerable, index, default \\ nil) when is_integer(index) do
    case slice_any(enumerable, index, 1) do
      [value] -> value
      [] -> default
    end
  end

  defp slice_any(list, start, amount) when is_list(list) do
    list |> drop_list(start) |> take_list(amount)
  end

  defp drop_list(list, 0), do: list
  defp drop_list([_ | tail], counter), do: drop_list(tail, counter - 1)
  defp drop_list([], _), do: []
end

eg: list = [1, 2, 3, 4, 5], index = 2

-> Enum.at(list, index)
  -> slice_any([1, 2, 3, 4, 5], 2, 1)
    args = [1 | 2, 3, 4, 5], 2
    -> next fn([2, 3, 4, 5], 1)

    args = [2 | 3, 4, 5], 1
    -> next fn([3, 4, 5], 0)

    return [3, 4, 5]

  -> drop_list([3, 4, 5], 1)
    args = [3 | 4, 5], 1
    return [3]

  return [3]

# 最終的なreturn
3

Elixirを使っている立場から言えることは、「頻出の処理なのでEnumに実装して頂ければ有難いのですが....」ということ。
以下に同じ処理をいくつかの記述方法で実装したサンプルを載せているが、結果的にEnum.reduce関数を使わずとも、再帰関数を記述すれば、上記の処理をbreakする形式で記述することは出来る。要は面倒か、面倒ではないかのみ。

自分は一年程前に「関数型言語って副作用ないし、かっこいいし、流行ってる...」という理由で始めた不届き者です。今回の仮説と記述を通して、今更ながら「関数型言語とはどういうものなのか?」と改めて考える機会になり、非常に勉強になった。

qiita.com

ロックの例えが分かりやすかった。ロックとは何かを定義するよりも、ロックをたくさん聞いた方が早いというのはなるほどなぁと感じた。

サンプル

mapにマイナスの値を持つkeyが1つでも存在すれば初登場のkvのセットを返すという処理を例に示す。実際にこの処理は以下の記事で過去に記述した処理を採用している。

この記事を書いた時は、再帰を使うのか...Enum.reduceを使うのか...とかなり悩んだ。

www.okb-shelf.work

本来はパフォーマンスの比較のために複数のkeyの数が異なるデータを用意するべきだが、記事が長くなりすぎてしまうため、今回は割愛。シンプルに5つのkey(A, B, C, D, E)を持つmapを用意して4種類の記述方法を試してみる。

data = %{
    "A" => 1,
    "B" => 1,
    "C" => -1,
    "D" => 1,
    "E" => -1
}

条件として、mapにマイナスの値が存在しており、確認した際は{:exist, %{"C" => -1}}のような該当したデータを返し、存在しないのであれば{:ok, %{}}を返すようにする。

Enum.reduce編

普通にreduce使って書く。

Enum.reduce(data, {:ok, %{}}, fn {k, v}, acc -> 
    {_, acc_map} = acc
    if v < 0 do
        if Enum.count(acc_map) == 0 do
            {:exist, Map.put(acc_map, k, v)}
        else
            acc
        end
    else
        acc
    end
end)
|> IO.inspect()

# {:exist, %{"C" => -1}}

ちゃんとif else endを使用して返す値を明示的に記述してあげないと、nilが返ってしまうため、思わぬ結果となってしまうので注意。個人的にはインデントとコード行数が複雑になるので、簡潔な記述とは言えないのが残念。

再帰関数編

いつも自分が採用する形式。なぜ再帰関数を使っているかというと、条件に一致した時に再帰を停止させることが出来るため。普段使いするgolangpythonでいうところの以下の処理に該当するのでデータ数が大きくなればなるほど、Enum.reduceで記述したものとはパフォーマンスに差が現れてくるだろう。計算量自体は最悪時を考えるため、一緒になるが(O(N), N=mapのkeyの数)。

package main
import "fmt"
func main(){
    // Your code here!
    
    num := 3
    for i := 0; i< 10; i++ {
        if i == num {
            fmt.Println("hit: ", i)
            break
        }
    }
}
defmodule Sample do
    def reduce_break(map) do
        _reduce_break(Map.keys(map), map, {:ok, %{}})
    end
    defp _reduce_break([], _, acc), do: acc
    defp _reduce_break([head | tail], map, acc) do
        {_, acc_map} = acc
        if Map.get(map, head) < 0 do
            {:exist, Map.put(acc_map, head, Map.get(map, head))}
        else
            _reduce_break(tail, map, acc)
        end
    end
end

Sample.reduce_break(data)
|> IO.inspect()

# {:exist, %{"C" => -1}}

モジュール使ったので若干、コード量は増えてはいるが見通しの良さは引数パターンマッチのおかげだ。先ほども記述したように、条件に一致した際に、再帰を終了して値を返して関数の実行を終了する。

if Map.get(map, head) < 0 do
    # return value
    {:exist, Map.put(acc_map, head, Map.get(map, head))}
else
    # recursive: call own
    _reduce_break(tail, map, acc)
end

Enum.filter + Enum.reduce編

最近、意識するようになった記述方法でほぼ全ての処理をEnumとパイプラインで記述することが可能なため、「Elixirを書いてるんだ〜」という熱い思いを感じることが出来る。

filtered = Enum.filter(data, fn {k, v} -> 
    if v < 0 do
        {k, v}
    end
end)

if length(filtered) > 0 do
    Enum.at(filtered, 0)
    |> (fn {k, v} -> {:exist, Map.put(%{}, k, v)} end).()
    |> IO.inspect()
end

# {:exist, %{"C" => -1}} 

書き終えてから気づいたが、reduce使っていない。あと、パイプラインで無名関数使うのはスタイルガイド的にはnot preferredなので、乱用しないこと。

無限リストと遅延評価を使った(Stream.transform)実装

今回の記事を書く前まで無限リストと遅延評価について、全く知らない状態だったが、記事をまとめる中で完全に理解して、チョットワカル状態になった。 こちらの実装方法は普段、大変お世話になっているpiacereさんから教えて頂いたものだ。

執筆されている、こちらの記事も非常に分かりやすかったです。

qiita.com

Stream.transform(data, false, fn {k, v}, judge ->
  if judge do
    {:halt, judge}
  else
    if v < 0 do
        {[{:exist, Map.put(%{}, k, v)}], true}
    else
        {[], judge}
    end
  end
end)
|> Enum.to_list()
|> (fn lst -> 
        if length(lst) > 0 do
            Enum.at(lst, 0)
        else
            {:ok, %{}}
        end
    end).()
|> IO.inspect()

# {:exist, %{"C" => -1}}

注意点としては、Stream.transformは少し癖があり、条件に一致しない時は要素を2つ持つタプルを返す必要があり、第1要素にはリストを。第2要素には次の再帰で使用したアキュムレーターの値を渡す。最終的な戻り値は{:halt, _}を返す直前の条件に一致しなかった際に記述してきた{[], judge}の第1要素になる(これで少し迷った)。

無限ストリームを使用しているため、必要な要素分だけ判定されるため、再帰を用いてbreakをした時と同様のパフォーマンス(データ生成のパフォーマンスは考えない)になるのではないかと考えられる。

結論

パフォーマンスを意識するのなら再帰関数かStream.transformを使うのが良くて、Enumとパイプラインの強力なコンビネーションを使いたいのであれば、Enum.filter -> Enum.reduceを使うのが良いのではないだろうか。

個人的な推しは記述の楽さからEnum.filter -> Enum.reduceです。データ数が増えてきたら再帰に実装し直せばいいかなと思います。

参考文献

【実装コード有り】銀行家のアルゴリズムの実装と検証

今更作った理由

たまたまgoogle scholarで並行処理に関する資料を眺めていたらダイクストラ法で有名なエドガー・ダイクストラさんの名前を見かけた。自分の中では数学者という認識をしていたが、実は分散コンピューティングの分野で、どのようにシステムの信頼性を保証するかという手法を最初に提唱したこともある、バリバリのコンピューター科学の精通者だった。当時、go to構文を用いてプログラミングを行なっていた従来の手法を改めて、現在、スタンダートになり、多くの人が使用するif文などを提唱した人物らしく驚いた。

ダイクストラさんに興味を持ち、彼の文献を漁っていたところ「銀行家のアルゴリズム」たるものを発見。デットロックを回避するためのアルゴリズムであり、自分の興味関心のある並行処理の分野に通ずるものがあったため、試しに実装しようと思った。

銀行家のアルゴリズムについて

詳しいことは参考にしたwikipediaの記事を参照。簡単にだけまとめておく。

  1. 全体の資源を保持しているメインプロセスは倉庫を持っており、大量のN種類の備品が用意されている。
利用できる資源:  
A B C D
3 1 1 2
  1. それぞれ、合計Pの各企業に試供品してメインプロセスの倉庫からN種類の備品がそれぞれいくつかレンタルの形で届く。
プロセス(および現在割り当て済みの資源):
   A B C D
P1 1 2 2 1
P2 1 0 3 3
P3 1 1 1 0
  1. あまりにも、試供品が素晴らしかったため、全ての企業から備品に関する問い合わせが届いた。メインプロセスには担当者が一人しかいないため、順番に問い合わせをさばいていく。

  2. P1社から問い合わせられた備品が倉庫に、それぞれ数があるかを確認してレンタル可能であれば、追加で備品を貸し出す。なお、各企業の契約状況によって貸し出せる備品の数は決まっている。

P1 がさらに Aを2、Bを1、Dを1 の資源を獲得し、最大値に到達する
- システムは A1、B0、C1、D1の資源を利用可能である
  1. P1社のレンタル期間が満了して、メインプロセスからレンタルしていた資源を返却する。
P1 終了し、Aを3 Bを3, Cを2、Dを2返却する
- システムはこの時点でA4、B3、C3、D3の資源を利用できる

この3 ~ 5の過程をPの分だけ繰り返す。もし問題なく全ての企業に備品を貸し出すことが出来たのであれば安全であると判断することが出来、どこかで備品が不足した際には安全ではないと判断する。

だいたい、こんな感じ

実装方法

もちろんElixirを使う。今回はElixir(Erlang/OTP)に用意されたGenServerを使ってmessage passingをサボってみることにする。

wikipediaにあるアルゴリズム擬似コードにあるP - プロセスの集合を真に受け取って、プロセスを1つ1つ立ち上げて、state serverに問い合わせるのが、インタリーブも複雑になり面倒だったので、プロセスは都度、立ち上げずに1つのプロセスから逐次依頼する形で実装する。
こうすればプロセスの処理の実行->終了->資源の解放の一連の処理を再現することも出来る。従って、wikipediaにあるforeach (p ∈ P)に近い形での実装になる。 おそらく、実務の場合では、複数のプロセスが逐次的ではなく並列的にリクエストを飛ばしてくるのでもっと複雑なものになる。今回は簡単のために上記を採用した。

実行したコード

全体像はgithubにpushしているのでこちらからどうぞ。

github.com

補足的にコードの説明をしておきます。

GenServerに用意したもの

銀行家のアルゴリズムを実装するにあたり、3つのAPIを用意した。どれも同期実行のhandle_callを使用している。(非同期はhandle_cast)

  • :init
  • :request
  • :state
  • :return

init

:initが使用された時に、GenServerにあるstateからリクエストにあった分だけを引き、次のstateとして使用する。

def handle_call({:init, %{"A" => _, "B" => _, "C" => _, "D" => _} = max_resource}, _from, state) do
  reply = Map.merge(state, max_resource, fn _, v1, v2 -> v1 - v2 end)
  {:reply, max_resource, reply}
end

今回は簡単のため、リクエストにはA, B, C, D以外のkeyのリクエストは受け付けないようにパターンマッチを使用している。

request

やっていることはほとんど:initと同じ。1つだけ異なるのは、差分更新をする際に値がマイナスになってしまったkeyが存在しているかを確認している。マイナスになるkeyがあるということは資源が不足している事を意味しており、この時にGenServer:unsafeを返す。

def handle_call({:request, %{"A" => _, "B" => _, "C" => _, "D" => _} = request}, _from, state) do
  reply = Map.merge(state, request, fn _, v1, v2 -> v1 - v2 end)
  {:reply, is_unsafe(Map.keys(state), reply), reply}
end

マイナスのkeyが存在しているかをstatekeysを元に再帰的に判定。

defp is_unsafe([], reply), do: {:ok, reply}
defp is_unsafe([head | tail], reply) do
  if Map.get(reply, head) >= 0 do
    is_unsafe(tail, reply)
  else
    {:unsafe, reply}
  end
end

state

現状のstateを返すだけで特にstateの更新は何もしていない。

def handle_call({:state}, _from, state) do
  {:reply, state, state}
end

return

プロセスの処理が終了した時に、貸し出していた資源を回収するためのもの。受け取った値をkey毎にstateに加えて、次のstateとする。

def handle_call({:return, %{"A" => _, "B" => _, "C" => _, "D" => _} = return}, _from, state) do
  merge_state = Map.merge(state, return, fn _, v1, v2 -> v1 + v2 end)
  {:reply, merge_state, merge_state}
end

BankersAlgo

ここからGenServerに対して、都度リクエストを飛ばして、資源の取得、更新を行う。処理は逐次処理にて実行して、順番はwikipediaにあった通りに作っただけ。ややこしいことをしている部分としては、一連の処理をシナリオとしてmapのデータ構造として定義しており、そのmapからイベントを実行していく。

呼び出されるのは以下の関数で、値の取得、GenServerの起動、シナリオの読み込みを行なっている。

def main(scenario_map) do
  # データ構造よりkeyを元に値を取得
  {init_state, init_each_resource, scenario} = get_values(scenario_map)
  # GenServerを起動
  {:ok, pid} = GenServer.start_link(Server, init_state)
  # Cpを再現するために、初回の資源割り振りを依頼
  Enum.each(init_each_resource, fn {_, req} -> GenServer.call(pid, {:init, req}) end)
  # 'scenario'(配列)を1つずつ読み込み、資源が不足しているかを確認する
  genserver_request(scenario, init_each_resource, pid)
end
defp genserver_request([], _init, _pid), do: :ok
defp genserver_request([scenario | tail], init_each_resource, pid) do
  # シナリオにある'process'の値から、初期資源の量の値を取得する
  {process, init} = get_init_resource(scenario, init_each_resource)
  # シナリオに定義された最大資源の値を取得
  max = Map.get(scenario, "request")
  # リクエスト = 最大資源 - 初期資源
  request = Map.merge(max, init, fn _k, v1, v2 -> v1 - v2 end)
  case GenServer.call(pid, {:request, request}) do
    {:ok, _} ->
      # 持っている資源をGenServerに返す
      GenServer.call(pid, {:return, max})
      # 初期資源をGenServerに返したとして値をクリアする
      updated_init = update_init_resource(init_each_resource, process)
      genserver_request(tail, updated_init, pid)
    {:unsafe, _} -> :unsafe
  end
end

検証の実行

testファイルにサンプルがあります。そのテストを実行することで値の変遷を確認することが出来る。安全なケースと安全ではないケースを1つずつ用意してある。

$ mix test

Compiling 1 file (.ex)

10:51:09.540 [info]  Set init state: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6}

10:51:09.542 [info]  Available resource: %{"A" => 3, "B" => 0, "C" => 1, "D" => 2}

10:51:09.542 [info]  ---> process request: P1

10:51:09.542 [info]  Received request: %{"A" => 2, "B" => 1, "C" => 0, "D" => 1}

10:51:09.542 [info]  Updated sever state: %{"A" => 1, "B" => -1, "C" => 1, "D" => 1}
.
10:51:10.543 [info]  Set init state: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6}

10:51:10.543 [info]  Available resource: %{"A" => 3, "B" => 1, "C" => 1, "D" => 2}

10:51:10.543 [info]  ---> process request: P1

10:51:10.543 [info]  Received request: %{"A" => 2, "B" => 1, "C" => 0, "D" => 1}

10:51:10.543 [info]  Updated sever state: %{"A" => 1, "B" => 0, "C" => 1, "D" => 1}

10:51:10.543 [info]  Return resource: %{"A" => 3, "B" => 3, "C" => 2, "D" => 2}

10:51:10.543 [info]  Received resource and merge: %{"A" => 4, "B" => 3, "C" => 3, "D" => 3}

10:51:10.543 [info]  ---> process request: P2

10:51:10.543 [info]  Received request: %{"A" => 0, "B" => 2, "C" => 0, "D" => 1}

10:51:10.543 [info]  Updated sever state: %{"A" => 4, "B" => 1, "C" => 3, "D" => 2}

10:51:10.543 [info]  Return resource: %{"A" => 1, "B" => 2, "C" => 3, "D" => 4}

10:51:10.544 [info]  Received resource and merge: %{"A" => 5, "B" => 3, "C" => 6, "D" => 6}

10:51:10.544 [info]  ---> process request: P3

10:51:10.544 [info]  Received request: %{"A" => 0, "B" => 0, "C" => 4, "D" => 0}

10:51:10.544 [info]  Updated sever state: %{"A" => 5, "B" => 3, "C" => 2, "D" => 6}

10:51:10.544 [info]  Return resource: %{"A" => 1, "B" => 1, "C" => 5, "D" => 0}

10:51:10.544 [info]  Received resource and merge: %{"A" => 6, "B" => 4, "C" => 7, "D" => 6}
.

Finished in 2.1 seconds
2 tests, 0 failures

Randomized with seed 378761

やりたいことはできたっぽい。

参考文献

【Elixir/supervisor入門】通常のモジュールに対してsupervisorを定義する

なぜ書いたのか

Elixirにおいてプロセスの死活管理に用いるsupervisorについて学習を進めていたが、どれもGenServer、すなわちElixirbehaviorを用いたサンプルばかりだった。また、そのプロジェクトはmix new --sup project_nameとして生成されるもので、後から「supervisor入れたい」となる事が割とあったが自前でmixプロジェクトにsupervisorを定義する方法についてよく分かっていなかった。

気軽に自身で作成したElixirbehaviorを用いないモジュールに対してsupervisorを定義するための手順がまとまった日本語記事が確認出来なかったので自身のためにこの記事を手順のまとめとして作成した。

今回扱うサンプルについて

プロセスの起動時に特定のメッセージをメインのプロセスに指定回数だけ送信し、上限に達した際にプロセスをkillするという簡単なものを扱う。せっかくなので、message passingが扱えるものを採用してみた。参考にした記事ではアキュムレーターをデクリメントしてカウントが0になったら終了するというシンプルなものだったので少し拡張させた。

medium.com

プロセスの関係性は以下のようになる。
f:id:takamizawa46:20200418124309p:plain:w550

プロジェクトの作成

コードの全体はこちら。
github.com

今回はmixを使わずに、プレーンに作っていくため、まずは適当にディレクトリを作る。

$ mkdir supervisor_sample
$ cd supervisor_sample

次にexファイルを用意。ファイル分割を考えたが手間になるため、今回は1ファイルに全てを記述する。

$ touch supervisor_sample/supervisor_sample.ex

各プロセスにて実行するモジュール内関数

関数名はsupervisorの公式のサンプルにあるような形式に乗っ取ってそれぞれ、start_linkinitとしておくが現在の推奨バージョンでは、特に関数名の縛りはないため自由な命名をすることが出来る。参考にした記事の実装は現在は非推奨 となっているため、一部を書き換えている。

defmodule Child do
  @moduledoc """
    supervisorから起動されるタスクを実行するプロセス
    上限回数まで親プロセスにメッセージを送信して、上限に達したら終了してkillされる
  """

  @doc """
    supervisorから実行される関数。
    supervisorからlinkされたプロセスの生成を行う
  """
  def start_link(receiver, max_send) do
    # launch process link to supervisor process
    pid = spawn_link(__MODULE__, :init, [receiver, max_send])
    {:ok, pid}
  end

  def init(receiver, max_send) do
    # set seed for each process
    IO.puts("Start child with pid #{inspect(self())}")
    Process.sleep(200)
  end
end

これでsupervisorからこのモジュールが呼び出されてタスクを実行するプロセスが立ち上がるようになった。spawn_linkを使っているのはプロセスが死んだことをsupervisorに通知するため。
しかし、このままではプロセスが立ち上がって何もせずに終了してしまうため、initに親プロセスに対してメッセージを送信するための処理を追加する。指定回数まで実行させたいので新たな関数を定義する。

defmodule Child do
  def init(receiver, max_send) do
    # set seed for each process
    IO.puts("Start child with pid #{inspect(self())}")
    Process.sleep(200)
    sender(receiver, max_send)
  end

  @doc """
    上限回数まで再帰的にメッセージを親プロセスに送信する関数
  """
  def sender(_, 0), do: :ok
  def sender(receiver, max_send) do
    send(receiver, {:MESSAGE, "hello! from #{inspect(self())}"})
    sender(receiver, max_send-1)
  end
end

supervisorの起動時に受け取った親プロセスのPIDを元に{:MESSAGE, *本文}を送信する。再帰的に実行し、アキュムレーターの値が0になった時点で再帰を終了する。これでプロセスに実行させる処理の記述が完了した。

supervisorの定義

次に呼び出す元となる親プロセス、すなわちsupervisorの定義をする。ただ単にsupervisorbehaviorに従って実装しただけになる。

defmodule Parent do
  @moduledoc """
    supervisorの定義モジュール
  """
  use Supervisor

  @doc """
    supervisorの起動
  """
  def start_link(receiver, total_process) do
    Supervisor.start_link(__MODULE__, {receiver, total_process}, name: __MODULE__)
  end

  @doc """
    supervisorの設定と戦略をまとめた関数
  """
  def init({receiver, total_process}) do
    children = Enum.map(1..total_process, fn n ->
      %{
        id: "#{__MODULE__}_#{n}",
        # Childのstart_link関数に引数を渡して呼び出す。
        start: {Child, :start_link, [receiver, total_process]},
        restart: :permanent
      }
    end)

    # Aプロセスが死んだ時にAプロセスを復活させる -> 上限は10回(全プロセスで合算)
    Supervisor.init(children, strategy: :one_for_one, max_restarts: 10)
  end
end

実装が必要なのはstart_linkinit命名された2つの関数。start_linksupervisorの起動のための関数で、initsupervisorの設定、戦略をまとめた関数になる。start_linkに関しては特に手を加える部分はないが、第2引数に値を設定することでinit関数に引数を渡すことが可能となる。

init関数でsupervisorで管理するプロセスの設定をしている。設定を定義したmapを要素に持つリストを作成するか、Supervisor.child_spec/2という関数を用いる。どんな設定が出来るのかは下記に記述されている。特に重要なのはstart:の部分で、ここにsupervisorから起動させたいモジュールと関数名、引数値を指定する。TaskAgentで指定する形式と同じなので特に困惑する部分はない。

Supervisor.initではsupervisorの戦略を定義する。今回は特殊な設定はしておらず、死んだプロセスを1つ1つ都度、復活させる*one_for_one、最大復活プロセス数は10とした。

これにて実行するプロセスとsupervisorの設定が可能となった。

実行ファイル

最後に作成したsupervisorを気軽に呼び出すため、送信されてきたメッセージを受信するための再帰receiveを実装したwrapperモジュールを用意する。

defmodule SupervisorSample do
  @moduledoc """
    supervisorの呼び出しとメッセージの受信をサボるためのwrapperモジュール
  """

  @doc """
    supervisorの起動と再帰的メッセージ受信ループを実行
  """
  def launch(total_process) do
    # launch supervisor
    Parent.start_link(self(), total_process)
    receiver()
  end

  def receiver() do
    receive do
      {:MESSAGE, content} ->
        IO.puts("received message!: #{inspect(content)}")
        receiver()
    end
  end
end

実行結果

iexを立ち上げて、.exファイルをコンパイルして実行する。

$ iex

iex(1)> c("supervisor_sample.ex")  
[Child, Parent, SupervisorSample]  

今回は簡単のために立ち上げるプロセスは2つとして実行する。実行すると以下のようなlogが出力される。

iex(2)> SupervisorSample.launch(2)
Start child with pid #PID<0.116.0>
Start child with pid #PID<0.117.0>
Start child with pid #PID<0.118.0>
Start child with pid #PID<0.119.0>
received message!: "hello! from #PID<0.116.0>"
received message!: "hello! from #PID<0.116.0>"
received message!: "hello! from #PID<0.117.0>"
received message!: "hello! from #PID<0.117.0>"
received message!: "hello! from #PID<0.119.0>"
received message!: "hello! from #PID<0.119.0>"
Start child with pid #PID<0.120.0>
Start child with pid #PID<0.121.0>
received message!: "hello! from #PID<0.118.0>"
received message!: "hello! from #PID<0.118.0>"
Start child with pid #PID<0.122.0>
Start child with pid #PID<0.123.0>
received message!: "hello! from #PID<0.120.0>"
received message!: "hello! from #PID<0.120.0>"
received message!: "hello! from #PID<0.121.0>"
received message!: "hello! from #PID<0.121.0>"
received message!: "hello! from #PID<0.122.0>"
Start child with pid #PID<0.124.0>
Start child with pid #PID<0.125.0>
received message!: "hello! from #PID<0.122.0>"
received message!: "hello! from #PID<0.123.0>"
received message!: "hello! from #PID<0.123.0>"
received message!: "hello! from #PID<0.124.0>"
Start child with pid #PID<0.126.0>
Start child with pid #PID<0.127.0>
received message!: "hello! from #PID<0.124.0>"
received message!: "hello! from #PID<0.125.0>"
received message!: "hello! from #PID<0.125.0>"
received message!: "hello! from #PID<0.126.0>"
** (EXIT from #PID<0.104.0>) shell process exited with reason: shutdown

こいつをElixirで整形して何回プロセスが再起動されているかを確認してみると12となっている。

log =
"""
Start child with pid #PID<0.116.0>
:
:
received message!: "hello! from #PID<0.126.0>"
"""

log
|> String.split("\n")
|> Enum.filter(fn s -> String.starts_with?(s, "Start child with") end)
|> length()
|> IO.puts() # 12

これは最初に起動される2つのプロセスは当然ながら再起動のカウントに含まれていないということを意味している。どうやら上手くsupervisorが動作しているようだ。

github.com

参考文献