やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

【モジュールとの比較】Elixirで無名関数を使って再帰処理を記述する方法

無名関数では再帰処理が難しい

Elixir再帰関数を記述しようと思った際には、defmodule Fooと定義して、そのモジュール内部にdef barのように関数を定義して、パターンマッチもしくは、分岐処理によって再帰関数を処理するのが一般的。

defmodule Sample do
  def sum([], acc), do: acc
  def sum([head | tail], acc) do
    sum(tail, acc+head)
  end
end

Sample.sum([1,2,3], 0)
|> IO.puts()
# 6

しかし、わざわざモジュールに定義したくない時、対して使い回す予定もなく、1回きりの再帰処理が書きたい場合には上記のようにモジュールを定義して、再帰関数を記述するという方法は煩わしいとも言える。とすると、使い切りの関数として候補にあがるのは無名関数ということになる。
しかしながら、工夫なしに記述した無名関数では再帰処理を行うことが出来ない。それはなぜか。以下のコードを元に話を進める。

sum_ = fn lst, acc -> 
  case lst do
    [] -> acc
    [head | tail] -> sum_.(tail, acc+head) # <- ここでerror
  end
end

sum_.([1,2,3], 0)

実行結果

warning: variable "sum_func" does not exist and is being expanded to "sum_func()", please use parentheses to remove the ambiguity or change the variable name
  Main.exs:4

このerrorから読み取るに4行目に記述しているsum_.(tail, acc+head)が実行不可能であるということ。それはなぜかというと、sum_/2関数のスコープが無名関数の内部からは参照出来ないからということになる。つまり、Elixirの無名関数は自身を自身で参照することが出来ないということになる。それに対してモジュールに定義された関数は自身のスコープを参照可能ということになっている。

ということで一工夫

色々と試行錯誤して以下の形に落ち着いた。レシピとしては無名関数の内部で別の無名関数を定義して、その無名関数自身を引数に渡して、再帰的に呼び出すことで先ほどのスコープ対象外になる問題を解決した。全く勉強したことがないが、ラムダ計算の分野からの知恵を借りた。

sum_ = fn arg_lst ->
  # 自分自身を引数に受け取ることでスコープ問題を解消
  sub_sum = fn lst, acc, own_func ->
    case lst do
      # 終了条件を記述
      [] -> acc
      # 自分自身を呼び出し、引数にも自分自身を渡す
      [head | tail] -> own_func.(tail, acc+head, own_func)
    end
  end
  # 内部で定義した無名関数を実行
  sub_sum.(arg_lst, 0, sub_sum)
end

# ついでにアキュムレーターを内部化
sum_.([1,2,3])
|> IO.puts()
# 6

無名関数の内部で別の無名関数を定義して、その無名関数自体を、その無名関数の引数に渡すという頭がおかしくなりそうな一手間を加えることで無名関数でも再帰処理を行うことが出来る。合計値を求めるだけのシンプルな処理なので可読性は何とか保てているが、より複雑な処理を行うとなると、無名関数での再帰処理には首を傾げることになる。コードの行数もシンプルにしようとしているのに行数が増えてしまっている。

「使うな!」とまでは言わないが、趣味用であり、チーム開発での使用は避けるべきだと感じた。こういうの好きだけど。せっかく覚えた知識なので、いくつかサンプルを記述した。モジュールとの比較もしているので可読性の悪さを感じてみてほしい。

サンプル

リストの各要素をN倍に

無名関数ver

n_times = fn n, lst -> 
  sub_n_times = fn lst, acc, own_func ->
    case lst do
        [] -> acc
        [head | tail] -> own_func.(tail, acc ++ [head * n], own_func)
    end
  end
  sub_n_times.(lst, [], sub_n_times)
end

n_times.(2, [1,2,3])
|> IO.inspect()
# [2, 4, 6]

モジュールver

defmodule Sample do
  def n_times(p, lst) do
    _n_times(lst, p, [])
  end
  defp _n_times([], _, acc), do: acc
  defp _n_times([head | tail], p, acc) do
    _n_times(tail, p, acc ++ [head * p])
  end
end

Sample.n_times(2, [1,2,3])
|> IO.inspect()
# [2, 4, 6]

というかEnum.map/2で良いのでは説。

enum_map_ver = fn p, lst ->
  Enum.map(lst, fn n -> n * p end)
end

enum_map_ver.(2, [1,2,3])
|> IO.inspect()
# [2, 4, 6]

クイックソート

以前の記事で記述したクイックソート(先頭要素をpivotとして取得するversion)を無名関数の再帰処理を用いてリライトしてみた。書いた感想といてはモジュールの方が楽だなというのが正直な感想。書き切った時は嬉しいが、後に見返してみるとそれぞれの関数の引数のスコープがどこまで参照可能なのかが分かりにくく、やはり可読性に欠ける。

無名関数ver

quick_sort = fn base_lst -> 
  append = fn sorted_lst, pivot ->
    sub_append = fn lst, acc, own_func -> 
      case lst do
        [] -> acc ++ [pivot]
        [head | tail] -> 
          if pivot > head do
            own_func.(tail, acc ++ [head], own_func)
          else
            acc ++ [pivot, head] ++ tail
          end
      end
    end
    sub_append.(sorted_lst, [], sub_append)
  end
  Enum.reduce(base_lst, [], fn p, acc -> 
    append.(acc, p)
  end)
end

quick_sort.([8,2,6,5,7,3,1,4,9])
|> IO.inspect()
# [1, 2, 3, 4, 5, 6, 7, 8, 9]

モジュールver

defmodule Algo do
  def quick_sort(lst), do: _quick_sort(lst, [])
  defp _quick_sort([], accum), do: accum
  defp _quick_sort([head | tail], accum) do
    res = append(head, accum, [])
    _quick_sort(tail, res)
  end
  def append(num, [], accum), do: accum ++ [num]
  def append(num, [head | tail], accum) do
    case num > head do
      true -> append(num, tail, accum ++ [head])
      false -> accum ++ [num] ++ [head] ++ tail
    end
  end
end

おまけ

こんな風に記述しても無名関数で再帰処理を行うことが出来るが、使用者に対して、引数に無名関数を渡すことを意識させる必要があるため、ナンセンスだとは思うが、分かっていて使うのであれば、コード量は減るので悪くはないと思う。

sum_ = fn lst, acc, func -> 
  case lst do
    [] -> acc
    [head | tail] -> func.(tail, acc+head, func)
  end
end

sum_.([1,2,3], 0, sum_)
|> IO.puts()
# 6

参考文献

【使用例あり(for文vs内包表記)】Pythonでデコレーターを使って関数の平均実行時間を測定する

実装したデコレーター用の関数

全体のコードはこちら
github.com

すでに似た様な記事が多く記述されているが、指定回数の平均実行時間を出力したいのは自分が初めてだと信じて実装してみた。pythonデコレーターを使用して、指定回数の平均実行時間を算出することが出来る。デコレーターに関する詳しい説明はすでに多くの記事が存在しているのでそちらにおまかせします。

./bench.py

import time
import logging

# benchmark用の関数
def benchmark(try_num):
    """
        TASK: デコレーターを行い関数に対して指定回数の実行を行う
        lst: []int, []float -> 対象の配列
        try_num: int -> 実行回数の合計数
        return ([]float, float) -> (実行結果を格納した配列, 平均値)
    """
    def _timer(exec_func, *args, **kwargs):
        """
            TASK: デコレーター経由で受け取った関数を実行し、実行時間を返す
            exec_func: function -> 実行したい関数
        """
        start = time.time()
        exec_func(*args, **kwargs)
        end = time.time()
        return end - start

    # デコレーターで装飾した関数を受け取る関数
    def _benchmark(exec_func):
        """
            TASK: 関数を受け取り、指定回数実行するサブ関数
            exec_func: function -> 実行したい関数
            return []float
        """
        # 装飾した関数の引数を受け取りベンチマークを実行する関数
        def _sub_bencmark(*args, **kwargs):
            # 計測結果
            logging.info(f"Start benchmark  「{exec_func.__name__}」 ...")
            messured = [_timer(exec_func, *args, **kwargs) for _ in range(0, try_num)]
            average = sum(messured) / len(messured)
            logging.info("Result: Average execution time -> {:.5f} s (total exec {})".format(average, try_num))
            return exec_func(*args, **kwargs)

        return _sub_bencmark

    return _benchmark

サンプル

from bench import benchmark
import logging

# loggerの設定(defaultでDEBUG, INFOは出力は出力されないため必須)
formatter = '%(levelname)s : %(asctime)s : %(message)s'
logging.basicConfig(level=logging.INFO, format=formatter)

@benchmark(100)
def sample(s):
    return s

sample("nice")

実行結果

INFO : 2020-03-14 20:38:35,967 : Start benchmark  「sample」 ...
INFO : 2020-03-14 20:38:35,967 : Result: Average execution time -> 0.00000 s (total exec 100)

@benchmark(10)と対象の関数の上部に記述することで デコレーターとなり、対象の関数を引数で受け取ることが出来る。ただ、今回は実行する回数を指定して、平均値を算出したいので実行回数を受け取る専用の関数(benchmark)でwrapしている。関数の内部化についてはやや複雑になっているが、

  • 実行回数を受け取る
  • 実行したい関数を受け取る
  • 実行したい関数に渡す引数を受け取る
  • 指定回数だけ実行して平均実行時間をlogに出力

という流れで実行しているだけなので大したことはやっていない。注意としては結果は従来の関数の戻り値に影響を与えないためログに出力している。

使用例: for文内包表記map関数での速度比較

よく「Pythonfor文は遅い。内包表記を使おう」と言われるが、それが本当なのかを先ほど作成したベンチマーク用のデコレーターを用いて検証してみよう。ついでなので、map関数も比較対象に追加した。

配列の各要素を2倍するという処理をfor文, 内包表記, map関数のそれぞれを用いて実装した。それぞれ実行した関数に対して先ほど作成したデコレーターを記述してやる。今回は10,000回実行での平均速度を比較してみよう。

./funcs.py

from bench import benchmark

TRY_NUM = 10000

@benchmark(TRY_NUM)
def for_double(lst):
    """
      TASK: forを使った配列の要素を全て2倍にする関数
      lst: []int, []float -> 対象の配列
      return []int, []float
  """
    res = list()
    for n in lst:
        res.append(n * 2)
    return res

@benchmark(TRY_NUM)
def comprehension_double(lst):
    """
      TASK: 内包表記を使った配列の要素を全て2倍にする関数
      lst: []int, []float -> 対象の配列
      return []int, []float
  """
    return [n * 2 for n in lst]

@benchmark(TRY_NUM)
def map_double(lst):
    """
      TASK: map関数を使った配列の要素を全て2倍にする関数
      lst: []int, []float -> 対象の配列
      return []int, []float
  """
    return list(map(lambda x: x * 2, lst))

これで関数側の用意は完了。次に実行ファイルを作成する。

./main.py

from funcs improt (
  for_double,
    comprehension_double,
    map_double
)

import logging

# loggerの設定(defaultでDEBUG, INFOは出力は出力されないため必須)
formatter = '%(levelname)s : %(asctime)s : %(message)s'
logging.basicConfig(level=logging.INFO, format=formatter)

def create_data(total_num):
    """
      TASK: テスト用の指定要素数を持つ配列を作成する関数
      total_num: int -> 配列要素数
      return []int
  """
    return [n for n in range(1, total_num+1)]

if __name__ == "__main__":
    # 各種の関数を実行
    data_for_benchmark = create_data(10000)

    # benchmarkを実行
    for_double(data_for_benchmark)
    comprehension_double(data_for_benchmark)
    map_double(data_for_benchmark)

では、結果を確認してみよう。

$ python3 main.py

INFO : 2020-03-14 20:53:47,441 : Start benchmark  「for_double」 ...
INFO : 2020-03-14 20:54:04,680 : Result: Average execution time -> 0.00172 s (total exec 10000)
INFO : 2020-03-14 20:54:04,692 : Start benchmark  「comprehension_double」 ...
INFO : 2020-03-14 20:54:13,866 : Result: Average execution time -> 0.00091 s (total exec 10000)
INFO : 2020-03-14 20:54:13,868 : Start benchmark  「map_double」 ...
INFO : 2020-03-14 20:54:28,890 : Result: Average execution time -> 0.00150 s (total exec 10000)

おお、やはり内包表記はかなり速い。数値としては約2倍ほど速いようだ。以外だったのはmap関数for文とほとんど同速であったということで、この結果の他に何度か検証を行なったが、for文が速い時もあればmap関数が速い時もあった。しかしながら内包表記は常に一番速かった。

こんな感じで、今回、実装したデコレーター用のベンチマーク関数を用いることで関数の平均実行速度を気軽に測定することが出来るので、ぜひ使ってくみて下さいね。

おまけ1: for文, 内包表記, map関数の別結果

INFO : 2020-03-14 20:53:17,841 : Start benchmark  「for_double」 ...
INFO : 2020-03-14 20:53:27,750 : Result: Average execution time -> 0.00099 s (total exec 10000)
INFO : 2020-03-14 20:53:27,753 : Start benchmark  「comprehension_double」 ...
INFO : 2020-03-14 20:53:33,386 : Result: Average execution time -> 0.00056 s (total exec 10000)
INFO : 2020-03-14 20:53:33,389 : Start benchmark  「map_double」 ...
INFO : 2020-03-14 20:53:44,338 : Result: Average execution time -> 0.00109 s (total exec 10000)
INFO : 2020-03-14 21:01:10,190 : Start benchmark  「for_double」 ...
INFO : 2020-03-14 21:01:22,041 : Result: Average execution time -> 0.00118 s (total exec 10000)
INFO : 2020-03-14 21:01:22,045 : Start benchmark  「comprehension_double」 ...
INFO : 2020-03-14 21:01:28,320 : Result: Average execution time -> 0.00062 s (total exec 10000)
INFO : 2020-03-14 21:01:28,328 : Start benchmark  「map_double」 ...
INFO : 2020-03-14 21:01:40,712 : Result: Average execution time -> 0.00124 s (total exec 10000)

おまけ2: 内包表記map関数が好きな人へ

Pythonの内包表記は関数型言語であるHaskellからの輸入品かつ、map関数も同じく関数型言語からの輸入品として有名です。少しでも興味があれば私が押しているElixirという言語について、ぜひ覗いてみて下さい。きっと気に入って頂けると思います。

www.okb-shelf.work

www.okb-shelf.work

参考文献

【サンプルコード有り】golangで三項演算子っぽいものを記述する方法について

嫌だなぁと感じる場面

業務を進める中で、以下の様なコードを書くことが多かった。

drink := ""
if orderNum {
    drink = "green tea"
} else {
    drink = "tea"
}

orderNumの様な関数の引数で指定される値であったり、http経由で指定されたqueryによって値をセットする処理なのだが、事前にdrinkという同じ型に該当する初期値となる変数を用意しておく必要があり個人的には好きではない。しかしながら、これがgolangの言語デザインでもあり、QiitaRui Ueyamaさんも言及している。

三項演算子があればなぁ...」と思い、何とかならないかなとあがいた結果が以下になる

無名関数を使った記述方法

先ほどのコードを無名関数を使った三項演算子っぽいものに書き換えてみた。

func Sample(orderNum int) string {
    // 無名関数を即時実行する
    drink := func(n int) string {
        if n == 1 {
            return "green tea"
        } else {
            return "tea"
        }
        // 即時実行のために引数を指定
    }(orderNum)

    // 無名関数の実行結果を返す
    return drink
}

やっていることは大したことはなくて、無名関数を定義して、その無名関数の戻り値を任意の変数に代入にしているだけ。わざわざ、外部に関数を定義するまでもないので無名関数で良くないかという判断に至った。nの値によって戻り値のパターンを増やしたいのならswitch文の採用も考える。無駄な変数を宣言する必要がないため、割と気に入っている。

パフォーマンスについて

気になるのは、無名関数を使った記述方法にすることで、どれだけオーバーヘッドになってしまうのかという点。普通に考えて、無名関数を使った方がパフォーマンスは悪いだろうなと思うが、考えても分からないので測定してみた。
それぞれ無名関数を使用する記述と、従来の記述とで1万回ずつ測定して、平均値を算出した。

比較のためにそれぞれを関数内に内包して、それぞれの関数の実行速度を比較するとする。

func AnonymousFunctionSample(orderNum int) string {
    // 無名関数を即時実行する
    drink := func(n int) string {
        if n == 1 {
            return "green tea"
        } else {
            return "tea"
        }
        // 即時実行のために引数を指定
    }(orderNum)

    // 無名関数の実行結果を返す
    return drink
}

func NormalSample(orderNum int) string {
    drink := ""
    if orderNum == 1 {
        drink = "green tes"
    } else {
        return "tea"
    }
    return drink
}

検証に使用したコードはこちら。
github.com

実行結果

$ bash benchmark.sh

[info] clear go test cached...
[info] start benchmark
=== RUN   TestSample
[info] average for anonymous function:  9.854730000000441e-08
[info] average for normal exec:  1.0573470000000177e-07
--- PASS: TestSample (0.01s)
PASS
ok      ternary_operator/src    0.015s

1万回平均で見ると、無名関数を使用した記述方法の方が速いようだ。
可読性は好き嫌いが分かれる所だと思うが、良ければ使ってください。

参考文献

【並行処理vs逐次処理】プロセスを事前に立ち上げることによって高速化されるのか【めちゃ速】

前回の話

「並行コンピューティング技法」を読み進める中で「並行和」の実装をElixirを用いて行なった。何とか実装が完了した並行和の処理を複数プロセスを立ち上げてチャンクを分割して実行する並行処理(parallel)と、チャンクを分割せずにメインの1つのプロセスのみで実行する逐次処理(serial)と比較した所、何と並行処理よりも逐次処理の方が約10000倍も速いという残念な結果に終わってしまった。完全敗北。

www.okb-shelf.work

どうすればパフォーマンスが向上するか

この旨を呟いた所、Yamazakiさんよりアドバイスを頂きました。(いつも有難うございます🙇‍♂️)

なるほど...。プロセスの起動と終了がボトルネックになり得るとは...。
Elixirは気軽にプロセス生成が強みの1つであり、ユースケースの1つとして個人的に考えているのは「大量のデータをチャンクに分割し、チャンク毎に対応するプロセスを立ち上げて前処理をゴリゴリと行う」という使い方。しかしながら、プロセス生成がボトルネックになるというのは中々につらい。
逆に言えば、サーバー起動時に常駐のプロセスが立ち上がり、それぞれがタスクを担うような処理に向いているということにはなる。

今回は実際にプロセスの起動と終了の内、実行時のプロセス起動がどれだけパフォーマンスに影響を与えているかを確認するために、あらかじめプロセスをベンチマーク測定前に立ち上げた状態で並行和の処理を行うように前回のベンチマークを改良してみた。

ベンチマークの実装

(結果が気になる方はこの章はすっ飛ばしてください)
コードの全体像はこちら
github.com

プロセスを実行時前に立ち上げることによって様々な問題が発生することが実装を進める中で次々に発覚して、ほとんど新しく書き直すこととなった。まずベンチマークに使用する検証用データ同様にプロセスを立ち上げて得られるプロセスのPIDをモジュール変数に保持させておく必要がある。

defmodule Sample do
  @processes_pid [1,2,3....]
end

この部分でかなりやられた。まず第一に今までプロセスの起動と実行終了結果を受け取るために使用してTaskが使用出来ないことが分かり、Elixirmessage passingを利用してフルスクラッチで実装する方針を取った。(Taskが使えない理由についてはおまけにて解説)
そうなると必然的に、プロセス間でのmessage passingをコントロールする必要があるため、

  • メインプロセス
  • 実行結果を受け取る受信用のプロセス -> 全てのプロセスの終了を確認したらメインプロセスに結果を送信
  • タスクを実行するN個(N -> データ数をチャンク数で割った数)のプロセス -> 終了次第、受信用プロセスに結果を送信

このように各プロセスの用意とmessageの定義をすることになった。詳しくは解説しないが、ざっくりとコードの核部分だけを見せておく。

受信用プロセスのための関数
./parallel_sum/lib/sum.ex

@doc """
    生成したプロセスによって算出した合計値を再帰関数のアキュムレーターに記録
  """
  def receiver(process_num, pid) do
    sum = _receiver(process_num, [])
    send(pid, {:ok, sum})
  end
  defp _receiver(0, accum), do: accum
  defp _receiver(process_num, accum) do
    receive do
      {:ok, sum} -> _receiver(process_num-1, [sum] ++ accum)
    # 指定時間以上経過した場合にプロセスを停止して終了
    after 1000 * ParallelSum.Config.waiting_limit_time_for_process() ->
      Logger.warn("[receiver] time over. so kill process")
    end
  end

起動時に生成するプロセス数と全ての結果の受信が完了した際にメインプロセスに結果を集約した結果を送信するためにメインプロセスのPIDを引数に持つ。再帰的に処理を行い、各プロセスから送信されてきた結果をアキュムレーターにて保持。カウンターの値が0になった時にメインのプロセスに最終的な結果を送信する。

集計を行うプロセスのための関数
./parallel_sum/lib/sum.ex

@doc """
    コマンドメッセージを受信するまで算出処理を行わないwrap関数
    チャンクはプロセス起動時に引数で受け取る
  """
  def waiting_sum(lst) when is_list(lst) do
    receive do
      {:recursive, pid} ->
        sum = recurcive_sum(lst)
        send(pid, {:ok, sum})
        # プロセスが停止するとベンチが行えなくなるため再帰
        waiting_sum(lst)
      {:enum, pid} ->
        sum = sum(lst)
        send(pid, {:ok, sum})
        # プロセスが停止するとベンチが行えなくなるため再帰
        waiting_sum(lst)
        # 指定時間以上経過した場合にプロセスを停止して終了
    after 1000 * ParallelSum.Config.waiting_limit_time_for_process() ->
      Logger.warn("[waiting_sum] time over. so kill process")
      exit(:time_over)
    end
  end

チャンク分割したデータ(配列)を受け取り、立ち上げるタスクに当たる関数。message passingによってチャンクを受け取る方法も考えたし、後に実装するつもりではあるが、今回はプロセスの事前立ち上げに焦点を当てているため引数経由でプロセス立ち上げ時にチャンクを受け取る方法を採用した。再帰的な処理を行わせているのはかなり特殊な対応で、benchfellaが同じベンチマークを上限時間内に何度も実行するため、再帰的に処理が行われるようにしておかねば2度目のベンチマーク時には事前に立ち上げたプロセスが誰一人、存在していないという状態になってしまうからになる。

肝となる部分がこの関数で、事前に立ち上げたプロセスがそれぞれ、集計開始のメッセージを受け取るまで待機するように作成してある。{:recursive, pid}{:enum, pid}というメッセージを受け取ることで初めて並行和の処理を開始する。

メインプロセスのための関数
./parallel_sum/bench/init_sum_bench.exs

# 共通処理を関数化
  def bench_helper(process_info, type_atom) do
    {process_num, calc_processes} = process_info
    # 受信用のプロセスを起動
    # receiver = spawn(ParallelSum.Sum, :receiver, [process_num, self()])
    self_pid = self()
    receiver = spawn(fn -> ParallelSum.Sum.receiver(process_num, self_pid) end)

    # それぞれのプロセスに算出開始のメッセージを送信
    Enum.each(calc_processes, fn pid ->
      send(pid, {type_atom, receiver})
    end)

    # 算出値を受信して合計
    receive do
      {:ok, sum} -> Enum.sum(sum)
    end
  end

立ち上げたプロセス情報(プロセス数, PIDを保持した配列)を受け取り、「算出値の受信用プロセス」を起動させて、「集計処理のために引数経由でチャンクを受け取り起動済みのプロセス」それぞれに対してPIDを指定して集計開始のmessageを送信する。そして、最終的に各プロセスから受け取った結果をまとめたmessageを受信用のプロセスから受け取り、メインプロセスにて値を合計して終了する。

モジュール変数で使用している関数はこちら。大したことはしていないのでコードのリンクを貼っておきます。
github.com

いざ測定

ベンチマークを行行う前に設定条件について説明しておく。配列の要素数100万程度(10Nのrangeで実行)の場合にベンチマークが制限時間内に終了しない問題にぶち当たり、これはコンピューターのスペック的に厳しいと判断。今回のベンチマークの配列の要素数の最大数は10万とし、1プロセスあたりが扱うチャンクの要素数100としたので、並行和を実行するプロセスは最大で1万個生成されたということになる。普通にめっちゃ多くね。

実行したベンチマークはこちら
github.com

設定条件の一覧

項目 設定値
最大配列要素数 100,000
生成する要素値の最大数(int) 1,000
1プロセスあたりの最大要素数 100
最大生成プロセス数 10,000
タスク実行最長許容時間 20seconds

結果

再帰関数を使用して並行和を算出した結果
f:id:takamizawa46:20200306230052p:plain

Enum.sum()を使用して並行和を算出した結果
f:id:takamizawa46:20200306230107p:plain

前回の結果(プロセスを事前立ち上げしないケース)

f:id:takamizawa46:20200223211206p:plain f:id:takamizawa46:20200223211212p:plain

.snapshot

ParallelSum.InitBench parallel: enum 100        500000 2555564
ParallelSum.InitBench  parallel: enum 1000       100000 2902751
ParallelSum.InitBench  parallel: enum 10000      10000  1819477
ParallelSum.InitBench  parallel: enum 100000     1000   1781611
ParallelSum.InitBench  parallel: recursive 100       500000 2517027
ParallelSum.InitBench  parallel: recursive 1000      100000 2878593
ParallelSum.InitBench  parallel: recursive 10000     10000  1834528
ParallelSum.InitBench  parallel: recursive 100000        1000   1710872

数値での比較

前回

算出方法 データ数 1回あたりの実行時間(µs/op)
enum 100 33.83112
enum 1000 349.8089
enum 10000 17106.37
enum 100000 4650929
recursive 100 22.67772
recursive 1000 243.1725
recursive 10000 10893.83
recursive 100000 1575123

今回

算出方法 データ数 1回あたりの実行時間(µs/op)
enum 100 5.111128
enum 1000 29.02751
enum 10000 181.9477
enum 100000 1781.611
recursive 100 5.034054
recursive 1000 28.78593
recursive 10000 183.4528
recursive 100000 1710.872

素数が10万の時での比較

算出方法 前回 今回 何倍速くなったか
enum 4650929 1781.61 約2610倍
recursive 1575123 1710.872 約920倍

考察と感想

確かにアドバイス頂いた通りにElixirではプロセスの起動部分が、並行化におけるかなりのオーバーヘッドになっているのはこの結果からも明らか。それにしても驚いた。こんなに速くなるものとは...。あと気になるのはチャンク分割がどれだけのオーバーヘッドになり得るかということで、事前にプロセスだけを立ち上げておいて、message passingにてチャンクデータを受け取る方式でのベンチマークも非常に気になる。

本来ならば別言語実装の同内容のベンチマークと結果を比較するべきだろうが、並行処理を初めて1年にも満たない人間が、気軽にプロセスを立ち上げて、気軽に並行処理を行えるElixirのストレスフリーなデザインは凄い。

またElixirで並行処理のパフォーマンスを活かすためには下記のような場合に適しているのではないか。

  • 実行時前にプロセスを立ち上げられる設計がされている -> eg: サーバー起動時にプロセスを立ち上げ
  • 事前にプロセスを立ち上げるべき目的になっている -> eg: 常駐プロセス

Elixir(Erlang)の採用事例を数多く見てきたが、GateWayであったり、stream serverであったり、プッシュ通知であったり...と、今回得られた知見とフィットしている。

次はmessage passingによってチャンクデータを受け取る方法でのベンチマークの測定を目指すこととする。

おまけ: なぜTaskが使えなかったのか

まず以下のログからTaskasync()で起動された後に、await()で結果を取得するまでの間に待機状態となるのではなく、適宜処理を実行していることが分かる。

iex(1)> Task.async(fn -> IO.puts("nice boat") end)
nice boat
%Task{
  owner: #PID<0.104.0>,
  pid: #PID<0.106.0>,
  ref: #Reference<0.3871736042.2998665217.74532>
}

これは特に問題ないのだが、致命的な問題が2つある。

Task.await()が実行出来るプロセスの縛り

以下の検証用モジュールを用意してmain_validaterを実行するとerrorになる。どうやら公式ドキュメントにもある通り、Task.await()Task.async()を用いてTaskを生成したPIDを持つプロセス内部でしか実行出来ない様だ。プロセスがリンクされているのだろうか。

When invoked, a new process will be created, linked and monitored by the caller. Once the task action finishes, a message will be sent to the caller with the result.

defmodule ParallelSum.TaskConf do
  @moduledoc """
    Taskモジュールを使用して生成されたプロセスは生成したプロセスによってしか回収できないかを確認
  """

  @doc """
    Taskを生成してmessage passing形式で返す
  """
  def create_task(pid) do
    :timer.sleep(1000)
    task_info = Task.async(fn -> :ok end)
    send(pid, {:ok, task_info})
  end

  @doc """
    自身のpidを用いてTask.awaitが問題なく実行可能かどうか
  """
  def main_validater() do
    pid = self()
    spawn(fn -> create_task(pid) end)
    receive do
      {:ok, task_info} -> Task.await(task_info)
    end
  end
end

実行結果

iex(1)> ParallelSum.TaskConf.main_validater
** (ArgumentError) task %Task{owner: #PID<0.203.0>, pid: #PID<0.204.0>, ref: #Reference<0.1202030419.853016577.25359>} must be queried from the owner but was queried from #PID<0.201.0>
    (elixir) lib/task.ex:594: Task.await/2

つまりは、「俺を生成したownerじゃないと結果が受け取れないで?」ということ。ベンチマークの際に実行するプロセスはモジュール変数をセットするプロセスと異なるプロセスであり、結果の受け取りが不可能なのではないかということに気づいた。

モジュール変数に任意の構造体は格納不可能

Taskは構造体であり、以下の様なデータになる。

%Task{
    owner: #PID<0.164.0>,
    pid: #PID<0.169.0>,
    ref: #Reference<0.1718095132.2719481858.113828>
  }

先ほどのownerもTask構造体の内部に記録されているが、Task.async()の戻り値はTask構造体であるため、モジュール変数に格納することが出来ない。モジュール変数に格納可能なデータ構造は以下のみ。

  • lists
  • tuples
  • maps
  • atoms
  • numbers
  • bitstrings
  • PIDs and remote functions in the format &Mod.fun/arity

「PID」ならいいんや。ということでspawn()によるプロセス生成とmessage passingの方針を採用したというわけになる。

参考文献

【並行処理vs逐次処理】Elixirで実装した並行和と逐次和をベンチマーク測定をして比較した結果【完全敗北】

前回までのお話

「並行コンピューティング技法」という書籍を読み進める中で「並行和」という並行処理にてデータの要素の合計値を求めるアルゴリズムElixirを用いて実装してみた。何とか動くところまで作れたものの、実行速度がどれだけ逐次処理と異なるかは実行してみないと分からない。そこでベンチマーク測定をするためのログを出力すると決めて1週間が経過していた。

前回のお話はこちらから

www.okb-shelf.work

ベンチマーク測定の実装

以前、Yamazakiさんよりご紹介頂いた「Benchfella」を使用しました。ご紹介頂きまして有難うございます。めちゃくちゃ手軽で使いやすかった。

githubREADME.mdに従い、実装を進めていく。
github.com

プロジェクトディレクトリの直下にbenchという名前のディレクトリを作成。合わせて、このディレクトリの直下にベンチマーク用のファイルを用意する。ベンチマーク測定に使用するモジュールを記述するファイルには_bench.exsを必ず付ける。golang_test.goと同じで対象ファイルを探索するために使用していると思われる。

以下のファイルを作成してベンチマークを行う準備をする。
./parallel_sum/bench/sum_bench.exs

defmodule ParallelSum.Bench do
  @moduledoc """
    実行時間などを外部モジュール'benchfella'を用いて計測を行うためのモジュール
  """
  use Benchfella
  @recursive "recursive"
  @enum "enum"
  @dataset_100 ParallelSum.DataCreater.create_N_size_list(100)

  # data size 100 -------------------------------------------
  ## recursive
  bench "parallel: recursive 100" do
    ParallelSum.bench_parallel_exec(@dataset_100, @recursive)
  end

  bench "serial: recursive 100" do
    ParallelSum.bench_serial_exec(@dataset_100, @recursive)
  end

  ## enum
  bench "parallel: enum 100" do
    ParallelSum.bench_parallel_exec(@dataset_100, @enum)
  end

  bench "serial: enum 100" do
    ParallelSum.bench_serial_exec(@dataset_100, @enum)
  end
end

一部を抜粋。ベンチマーク測定に使用する処理の最終値は毎回同じである必要があり、要は検証データは使い回しする必要がある。前回の記事を参照してもらえれば分かるが、今の並行和の実装はpipeline関数内部でランダム値を使用したデータを生成しているため、このままではベンチマーク測定が出来ない。そのため、ベンチマーク用に並行和、逐次和、それぞれに専用の関数を実装した。

./parallel_sum/lib/parallel_sum.ex

defmodule ParallelSum do
  @moduledoc """
    メインモジュール。他モジュールにて抽象化した関数群をpipeline化する
  """
  @doc """
    benchfella用の実行関数(算出結果が異なるとerrorになるため)
  """
  def bench_parallel_exec(_, ""), do: :error
  def bench_parallel_exec([], _), do: :error
  def bench_parallel_exec(dataset, mode) do
    # 立ち上げるプロセス数を算出
    process_num = ParallelSum.Scheduler.calc_total_process(length(dataset))
    # プロセスを立ち上げて並行和を算出
    parallel_sum = ParallelSum.Scheduler.start_task(
      mode,
      ParallelSum.Scheduler.list_spliter(dataset),
      process_num
    )
    Enum.sum(parallel_sum)
  end

  @doc """
    benchfella用の実行関数(算出結果が異なるとerrorになるため)
  """
  def bench_serial_exec(_, ""), do: :error
  def bench_serial_exec([], _), do: :error
  def bench_serial_exec(dataset, mode) do
    _serial_exec(mode).(dataset)
  end
end

これで準備は完了。あとはドキュメントに従ってベンチマークを実行するのみ。

ベンチマークの実行方法について

プロジェクトディレクトリの直下に移動して、以下コマンドを実行。その前に、ファイル変更がある場合はmix compileをお忘れなく。

$ mix bench

Settings:
  duration:      1.0 s

## ParallelSum.Bench
[20:37:04] 1/4: parallel: enum 100
[20:37:07] 2/4: parallel: recursive 100
[20:37:11] 3/4: serial: enum 100
[20:37:13] 4/4: serial: recursive 100

Finished in 10.74 seconds

## ParallelSum.Bench
benchmark name           iterations   average time 
serial: recursive 100       1000000   1.49 µs/op
serial: enum 100            1000000   1.60 µs/op
parallel: recursive 100       50000   57.99 µs/op
parallel: enum 100            50000   60.18 µs/op

終了後の全体ログを出力しているが、実行中には現在の進行度合いがリアルタイムに出力されるため非常に便利。ログの内容もシンプルで分かりやすい。

# ベンチマーク実行までにかかった時間
Settings:
  duration:      1.0 s
# 経過ログ
## ParallelSum.Bench
[20:37:04] 1/4: parallel: enum 100
[20:37:07] 2/4: parallel: recursive 100
[20:37:11] 3/4: serial: enum 100
[20:37:13] 4/4: serial: recursive 100
# ベンチマーク終了後に出力される結果
## ParallelSum.Bench
benchmark name           iterations   average time 
serial: recursive 100       1000000   1.49 µs/op
serial: enum 100            1000000   1.60 µs/op
parallel: recursive 100       50000   57.99 µs/op
parallel: enum 100            50000   60.18 µs/op

合わせて、ベンチマークの結果をbench直下のsnapshotsというディレクトリに自動で保存してくれる。ファイル形式が.snapshopという独自形式だが、ただのテキスト(binary)のようなのでVSCodeでも問題なく閲覧出来た。

./parallel_sum/bench/snapshots/_.snapshots

duration:1.0;mem stats:false;sys mem stats:false
module;test;tags;iterations;elapsed
ParallelSum.Bench   parallel: enum 100      50000   3009104
ParallelSum.Bench   parallel: recursive 100     50000   2899529
ParallelSum.Bench   serial: enum 100        1000000 1596746
ParallelSum.Bench   serial: recursive 100       1000000 1491282

他にもベンチマーク実行前、実行後に行いたい処理を指定したり、複数のベンチマークを比較させたりと対応範囲が広いなぁという印象。詳しくはREADNE.mdを見てください。個人的にアツいと思ったのは.snapshotファイルからグラフを描画できる点。

$ mix bench.graph

Wrote bench/graphs/index.html

f:id:takamizawa46:20200223211558p:plain:w450
(別のベンチマークの結果。ファイルを誤って削除してしまった)

$ open bench/graphs/index.htmlとコマンドを実行してブラウザから生成されたグラフを閲覧することが出来る。凄い。さて、ベンチマークの実行方法も分かったのでベンチマークをデモのものから書き換える。

いざベンチマーク: 逐次処理vs並行処理

少し行数が多くなったのでリンクを貼っておきますが、内容は対したものではなく、ただデータ数と集計方法を変えてベンチマークを実行しているだけ。一般化する良い方法が思いつかなかったので、別にケース分けるならということでヨシ!とした。また、最大要素数は100,000としているが特に理由はなく、何となくそれ以上の要素数にするとベンチマークが終わらないのではないかと思ったからだ。
github.com

いざ、ベンチマークを実行!!!

$ mix bench

Settings:
  duration:      1.0 s

## ParallelSum.Bench
[14:03:33] 1/20: parallel: enum 100
[14:03:35] 2/20: parallel: enum 1000
[14:03:39] 3/20: parallel: enum 10000
[14:03:41] 4/20: parallel: enum 100000
[14:03:45] 5/20: parallel: enum 1000000
[14:09:00] 6/20: parallel: recursive 100
[14:09:02] 7/20: parallel: recursive 1000
[14:09:04] 8/20: parallel: recursive 10000
[14:09:06] 9/20: parallel: recursive 100000
[14:09:07] 10/20: parallel: recursive 1000000
[14:13:30] 11/20: serial: enum 100
[14:13:38] 12/20: serial: enum 1000
[14:13:42] 13/20: serial: enum 10000
[14:13:46] 14/20: serial: enum 100000
[14:13:50] 15/20: serial: enum 1000000
[14:13:54] 16/20: serial: recursive 100
[14:14:03] 17/20: serial: recursive 1000
[14:14:07] 18/20: serial: recursive 10000
[14:14:11] 19/20: serial: recursive 100000
[14:14:15] 20/20: serial: recursive 1000000

Finished in 646.13 seconds

## ParallelSum.Bench
benchmark name               iterations   average time 
serial: enum 100               10000000   0.76 µs/op
serial: recursive 100          10000000   0.80 µs/op
serial: recursive 1000           500000   6.49 µs/op
serial: enum 1000                500000   6.60 µs/op
parallel: recursive 100           50000   22.68 µs/op
parallel: enum 100                50000   33.83 µs/op
serial: recursive 10000           50000   64.08 µs/op
serial: enum 10000                50000   65.76 µs/op
parallel: recursive 1000          10000   243.17 µs/op
parallel: enum 1000               10000   349.81 µs/op
serial: recursive 100000           5000   641.54 µs/op
serial: enum 100000                5000   647.33 µs/op
serial: recursive 1000000           500   6415.18 µs/op
serial: enum 1000000                500   6530.90 µs/op
parallel: recursive 10000           100   10893.83 µs/op
parallel: enum 10000                100   17106.37 µs/op
parallel: recursive 100000            1   1575123.00 µs/op
parallel: enum 100000                 1   4650929.00 µs/op
parallel: recursive 1000000           1   262729181.00 µs/op
parallel: enum 1000000                1   314724083.00 µs/op

めっちゃ時間かかった。このままだと非直感的で分かりにくいのでjupyter notebookPython使ってグラフ化。

並行処理(parallel)

f:id:takamizawa46:20200223211206p:plain

f:id:takamizawa46:20200223211212p:plain

逐次処理(serial)

f:id:takamizawa46:20200223211216p:plain

f:id:takamizawa46:20200223211221p:plain

「ん?全然差がないやん?」と思うかもしれないが実はそうではなく、実際は数値からも見て分かる通りかなり平均処理時間に大きな差がある。単純にPythonmatplotlib.pyplot力がなくてy軸最大値を統一していないのが悪い、というか比較するグラフで単位(10n)が異なるのどうなのかと思うが、並行処理(parallel)の方が10^8であり、逐次処理(serial)の方が10^3になっており、天と地ほどの差がある。2つのグラフを並べて表示させてみたが、数値に差がありすぎて逐次処理の実行時間の値が表示されないという悲しすぎる問題に直面した。

なぜ並行処理の方がこんなにも遅いのか

まずは結果を重く受け止める。正直、データ量がどのあたりで並行処理が逐次処理を上回るかなとワクワクしていたが幻想をぶち壊されてしまった。何がこうも処理を遅くさせているのか。「並行コンピューティング技法」の書籍から得た知識から推測するに「オーバーヘッド」部分がかなりの負担となっていると考えられる。ここでいうオーバーヘッドにあたる処理は以下の3つ。

  • データを要素N個ずつのチャンクに分割
  • プロセスの立ち上げとチャンク割り当て & 合計処理の実行と終了確認 -> プロセスの停止
  • 各プロセスが算出した値をメインプロセスにて合計

チャンク分割が上手く出来ていないのだろうか。
Enum.chunk_every()使ってるだけで記述の雑みの問題はないかと思うが、単純な疑問として100,0000もの要素を持つ配列を分割するのにどれだけの時間が掛かるのだろう。index=0から順に要素を見ていき、index=10になったところで配列を分割。このような再帰処理を行うとすればO(N)の計算量がかかるはず。プロセスの起動と終了に関してもElixirTaskを使っているだけで、改善を見込めそうにない。
現状の手札だけでは打つ手がない。

改善方法について

もっと細かくベンチマークを行い、どこのオーバーヘッドがボトルネックになりうるのかを洗い出すところから始めてみる。今回の成果物としてはベンチマークを実行する方法を身につけた事と、並行処理と逐次処理を比べるという人生初体験が出来たということで、次にその改善をするチャンスを得られたということにしておく。必ず、特定条件以下では並行処理で逐次処理を倒してやる。

参考文献