やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

【第17回清流elixir勉強会】Elixirを用いた並行和アルゴリズムの実装

トピック

あけましておめでとうございます。1月2月は色々と忙しくて、全く清流elixirにて勉強会を開催できず...
2ヶ月ぶりの開催となりましたが常連の方、新規の方、リモートでの接続、多くの方に参加して頂けました。ありがとうございました。ハッピーターンを持ち合わせていったのですが気づいたら自分が半分以上食べてしまっていました。忍びない..

elixir-sr.connpass.com

清流elixir-infomation
開催場所: 丸の内(愛知)
参加人数: 3 -> 6 update!
コミュニティ参加人数 : 37 -> 40
2020/02/14現在

第17の勉強会について

今回は新年最初の勉強会であったため、メインテーマがお恥ずかしながら未定であったため、「もくもく会」という形で勉強会を開催しました。各々、自身の取り組みたい内容について約2時間、取り組みました。僕は最近読み始めた「並行コンピューティング技法」の書籍で第6章で紹介されている「並行和」というアルゴリズムの実装をElixirで挑戦してみました。書籍にはOpenMPとPthreadでのコードが記載されているのですが中々のボリュームな上、OpenMPやPthreadは計算機科学の分野で広く仕様されるライブラリため親しみが全くありません。これを「Elixirで書き換えたらどんなもんやろなぁ」という単純な興味からのスタートです。

www.okb-shelf.work

実装したコードはこちら
github.com

並行和について

詳しくは「並行コンピューティング技法 第6章のまとめ」という記事を後日出し、そちらで解説を行うが、ざっくりとした説明をしておく。何かしらのデータが手元に存在しているとして、それはN個の要素を持つ配列だと考える。このN個の要素を持つ配列をM個の間隔でデータを分割してチャンクを作成する。作成したチャンクをチャンク数分だけ用意したスレッド、もしくはプロセスの内部で配列の要素を合計した数値を算出する。そして、それぞれのスレッド、もしくはプロセスによって合計された値をメインのスレッド、もしくはプロセスにて集約する。この一連の処理を「並行和」と言っている。

# N個(今回はN=10)の要素を持つ配列を用意
[1,2,3,4,5,6,7,8,9,10]

# 用意したデータをM(今回はM=2)の間隔で分割してチャンクを作成
[
  [1,2],
  [3,4],
  [5,6],
  [7,8],
  [9,10]
]

# 作成したチャンクをそれぞれのスレッド、もしくはプロセスに割り当てて合計値を算出する
[
  [1+2],
  [3+4],
  [5+6],
  [7+8],
  [9+10],
]

# それぞれのスレッド、もしくはプロセスによって算出された合計値をメインのスレッド、もしくはプロセスにて合計
[3 + 7 + 11 + 15 + 19] -> 55

この一連の処理をElixirを用いて作成していく。

暗黙の仕様について

手元に丁度、頃合いのデータがないこともあり、作成するデータの性質と自動でスケールさせるかについて実装前に決めておく。以下の仕様(といっても、作成するのは自分だけなので暗黙の仕様ということで)を実装にあたり遵守する。

  • データ分割による並行化を行い、データのサイズによって自動でスケール(起動させるプロセス数をコントロール)するようにする
  • データはN個の要素を持つ配列を採用。要素はint型の上限値を定めて生成するランダム値

この勉強会の2時間の間で何とか動く所までは実装することが出来た。といっても、並行処理に関してはElixirは実装されているモジュールが優秀なため、ほとんど自前で作成したものはなく、ほとんどがwrap関数とhelper関数になった。さすがのElixir

並行処理の起動までの流れ

実装についてはgithubの方を参照して頂ければ良いので全体の処理の流れを解説しておく。

  • N個のランダム値の要素を持つ配列を用意
  • M個の間隔でチャンクに分割
  • 分割したチャンクをプロセスの専属データとして、プロセスを新規で立ち上げる
  • それぞれのプロセスにて合計値(sum)を算出
  • メインのプロセスにて各プロセスの合計値を合計

この一連の処理をまとめているのが以下のファイル
./parallel_sum_ex/lib/parallel_sum.ex

defmodule ParallelSum do
  @moduledoc """
    メインモジュール。他モジュールにて抽象化した関数群をpipeline化する
  """

  @doc """
    並行処理によって合計値を算出するpipeline関数
  """
  def parallel_exec(_, ""), do: :error
  def parallel_exec([], _), do: :error
  def parallel_exec(data_size, mode)  do
    # ランダム値を要素に持つリストを分割したチャンクデータを作成
    dataset =
      ParallelSum.DataCreater.create_N_size_list(data_size)
      |> ParallelSum.Scheduler.list_spliter()

    # 立ち上げるプロセス数を算出
    process_num = ParallelSum.Scheduler.calc_total_process(data_size)

    # プロセスを立ち上げて並行和を算出
    parallel_sum = ParallelSum.Scheduler.start_task(mode, dataset, process_num)
    Enum.sum(parallel_sum)
  end
end

別ファイルにて定義したモジュール内関数を利用して先ほどの流れを実行していく。自動スケールについてだが、スケール、すなわち立ち上げるプロセス数は全データのサイズをconfigにて設定した1プロセスが扱える最大要素数の数によって決定される。算出式はめちゃくちゃシンプルで全データ数を1プロセセスが扱える最大要素数で割るだけ。仮にデータ数が1000であり、最大要素数が100であれば10個のプロセスを立ち上げることになる。

./parallel_sum_ex/lib/scheduler.ex

defmodule ParallelSum.Scheduler do
  @moduledoc """
    データのチャンク分割とプロセスの起動の一連のスケジューリングを実行するための関数
    他プロセスによるタスク実行はmessage passingでやると手間なのでTaskに任せる
  """

  @doc """
    configに設定された1chunkが扱える最大要素数を元に立ち上げるべきプロセスの総数を算出する
    processs_num = 配列の要素数 / 1チャンク辺りの扱える要素の最大値
  """
  def calc_total_process(data_size) do
    each_chunk_size_max = ParallelSum.Config.each_chunk_list_size_limit()
    div(data_size, each_chunk_size_max)
  end
end

少しだけテクいことをしているとすれば、Taskモジュールを仕様したタスクを持つプロセスの作成と実行結果の受け取り部分。過去に何度も同じことをやったことがあるが、Task.async()に実行する無名関数を渡して、生成されたプロセスのpidを要素に持つリストを作成する。パイプライン演算子を仕様して、立ち上げたプロセスの実行結果をTask.await()とすることで受け取っていく。割と頻出の処理なので新規性は特にない(はず)。

Taskの使い方は過去の記事にまとめているのでご参照ください。

www.okb-shelf.work

再帰関数とEnum.sum()を使った際でパフォーマンスに差が出来るのかを後々、測定したいので第1引数に対してパターンマッチを行い、実行する無名関数を変化させている。あいかわらずパターンマッチ半端ないって。
./parallel_sum_ex/lib/scheduler.ex

defmodule ParallelSum.Scheduler do
  @moduledoc """
    データのチャンク分割とプロセスの起動の一連のスケジューリングを実行するための関数
    他プロセスによるタスク実行はmessage passingでやると手間なのでTaskに任せる
  """

  @doc """
    分割されたデータと算出された立ち上げるプロセス数を元にTask moduleを起動して再帰関数によって集計処理を実行
    各プロセスが算出した合計値がリストになって返ってくる
  """
  def start_task(_, lst, _) when length(lst) == 0, do: :error
  def start_task(_, _, 0), do: :error
  def start_task("", _, _), do: :error

  def start_task("recursive", splited_lst, process_num) do
    # プロセスを起動し、チャンクを割り当て
    Enum.map(1..process_num, fn index ->
      Task.async(fn -> ParallelSum.Sum.recurcive_sum(Enum.at(splited_lst, index-1)) end)
    end)
    |> Enum.map(fn task -> Task.await(task) end)
  end

  def start_task("enum", splited_lst, process_num) do
    # プロセスを起動し、チャンクを割り当て
    Enum.map(1..process_num, fn index ->
      Task.async(fn -> ParallelSum.Sum.sum(Enum.at(splited_lst, index-1)) end)
    end)
    |> Enum.map(fn task -> Task.await(task) end)
  end
end

Usage

今回は実行結果を確認する部分までしか実装できなかった。本当はメモリ使用率や経過時間なども出力して、グラフ化して色々と満たされたいのだがご愛嬌。実行にはElixirがインストール済みである必要がある。

# iexを立ち上げる
> $ iex -S mix


# N -> 配列の要素数
# mode -> sumの集計方法(recurcive -> 再帰処理, enum -> Enum.sum() のどちらか)

# 並行和(複数プロセスによる算出)
iex> ParallelSum.parallel_exec(N, mode)

# 逐次和(単一プロセスによる算出)
iex> ParallelSum.serial_exec(N, mode)

実行結果

iex(1)> ParallelSum.parallel_exec(100_000, "recursive")
50084347

iex(2)> ParallelSum.parallel_exec(100_000, "enum")     
50113202

iex(3)> ParallelSum.serial_exec(100_000, "recursive")
50099269

iex(4)> ParallelSum.serial_exec(100_000, "enum")     
50106982

実装して気づいたけど、明らかに逐次処理の方が速い。何でだろう...🤔
プロセスの立ち上げやデータ参照等がオーバーヘッドになっているということだろうけど厳密な検証は実行時間などを確認出来る様にしてから行なっていこう

参加者の方々の成果物

確認できるものを一部、参照させて頂きます。

kikuyutaさん

自分はハードウェアはさっぱりですが、内部処理の仕組みについて教えて頂きました。

GKBR56さん
ElixirでMMORPGを作られている方です。youtubeで実際に動くところを確認することが出来ます。オンラインゲームの仕組みがどうなっているか自分はさっぱりですが、最初はシンプルなechoサーバーを作るところから始めたと伺いました。

https://www.youtube.com/watch?v=nwxHhOcTHBw


mmo.ex デモ動画

近日、新たな動画を公開予定だそうです。Qiitaの記事は以下を参照下さい
qiita.com