やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

【並行処理vs逐次処理】プロセスを事前に立ち上げることによって高速化されるのか【めちゃ速】

前回の話

「並行コンピューティング技法」を読み進める中で「並行和」の実装をElixirを用いて行なった。何とか実装が完了した並行和の処理を複数プロセスを立ち上げてチャンクを分割して実行する並行処理(parallel)と、チャンクを分割せずにメインの1つのプロセスのみで実行する逐次処理(serial)と比較した所、何と並行処理よりも逐次処理の方が約10000倍も速いという残念な結果に終わってしまった。完全敗北。

www.okb-shelf.work

どうすればパフォーマンスが向上するか

この旨を呟いた所、Yamazakiさんよりアドバイスを頂きました。(いつも有難うございます🙇‍♂️)

なるほど...。プロセスの起動と終了がボトルネックになり得るとは...。
Elixirは気軽にプロセス生成が強みの1つであり、ユースケースの1つとして個人的に考えているのは「大量のデータをチャンクに分割し、チャンク毎に対応するプロセスを立ち上げて前処理をゴリゴリと行う」という使い方。しかしながら、プロセス生成がボトルネックになるというのは中々につらい。
逆に言えば、サーバー起動時に常駐のプロセスが立ち上がり、それぞれがタスクを担うような処理に向いているということにはなる。

今回は実際にプロセスの起動と終了の内、実行時のプロセス起動がどれだけパフォーマンスに影響を与えているかを確認するために、あらかじめプロセスをベンチマーク測定前に立ち上げた状態で並行和の処理を行うように前回のベンチマークを改良してみた。

ベンチマークの実装

(結果が気になる方はこの章はすっ飛ばしてください)
コードの全体像はこちら
github.com

プロセスを実行時前に立ち上げることによって様々な問題が発生することが実装を進める中で次々に発覚して、ほとんど新しく書き直すこととなった。まずベンチマークに使用する検証用データ同様にプロセスを立ち上げて得られるプロセスのPIDをモジュール変数に保持させておく必要がある。

defmodule Sample do
  @processes_pid [1,2,3....]
end

この部分でかなりやられた。まず第一に今までプロセスの起動と実行終了結果を受け取るために使用してTaskが使用出来ないことが分かり、Elixirmessage passingを利用してフルスクラッチで実装する方針を取った。(Taskが使えない理由についてはおまけにて解説)
そうなると必然的に、プロセス間でのmessage passingをコントロールする必要があるため、

  • メインプロセス
  • 実行結果を受け取る受信用のプロセス -> 全てのプロセスの終了を確認したらメインプロセスに結果を送信
  • タスクを実行するN個(N -> データ数をチャンク数で割った数)のプロセス -> 終了次第、受信用プロセスに結果を送信

このように各プロセスの用意とmessageの定義をすることになった。詳しくは解説しないが、ざっくりとコードの核部分だけを見せておく。

受信用プロセスのための関数
./parallel_sum/lib/sum.ex

@doc """
    生成したプロセスによって算出した合計値を再帰関数のアキュムレーターに記録
  """
  def receiver(process_num, pid) do
    sum = _receiver(process_num, [])
    send(pid, {:ok, sum})
  end
  defp _receiver(0, accum), do: accum
  defp _receiver(process_num, accum) do
    receive do
      {:ok, sum} -> _receiver(process_num-1, [sum] ++ accum)
    # 指定時間以上経過した場合にプロセスを停止して終了
    after 1000 * ParallelSum.Config.waiting_limit_time_for_process() ->
      Logger.warn("[receiver] time over. so kill process")
    end
  end

起動時に生成するプロセス数と全ての結果の受信が完了した際にメインプロセスに結果を集約した結果を送信するためにメインプロセスのPIDを引数に持つ。再帰的に処理を行い、各プロセスから送信されてきた結果をアキュムレーターにて保持。カウンターの値が0になった時にメインのプロセスに最終的な結果を送信する。

集計を行うプロセスのための関数
./parallel_sum/lib/sum.ex

@doc """
    コマンドメッセージを受信するまで算出処理を行わないwrap関数
    チャンクはプロセス起動時に引数で受け取る
  """
  def waiting_sum(lst) when is_list(lst) do
    receive do
      {:recursive, pid} ->
        sum = recurcive_sum(lst)
        send(pid, {:ok, sum})
        # プロセスが停止するとベンチが行えなくなるため再帰
        waiting_sum(lst)
      {:enum, pid} ->
        sum = sum(lst)
        send(pid, {:ok, sum})
        # プロセスが停止するとベンチが行えなくなるため再帰
        waiting_sum(lst)
        # 指定時間以上経過した場合にプロセスを停止して終了
    after 1000 * ParallelSum.Config.waiting_limit_time_for_process() ->
      Logger.warn("[waiting_sum] time over. so kill process")
      exit(:time_over)
    end
  end

チャンク分割したデータ(配列)を受け取り、立ち上げるタスクに当たる関数。message passingによってチャンクを受け取る方法も考えたし、後に実装するつもりではあるが、今回はプロセスの事前立ち上げに焦点を当てているため引数経由でプロセス立ち上げ時にチャンクを受け取る方法を採用した。再帰的な処理を行わせているのはかなり特殊な対応で、benchfellaが同じベンチマークを上限時間内に何度も実行するため、再帰的に処理が行われるようにしておかねば2度目のベンチマーク時には事前に立ち上げたプロセスが誰一人、存在していないという状態になってしまうからになる。

肝となる部分がこの関数で、事前に立ち上げたプロセスがそれぞれ、集計開始のメッセージを受け取るまで待機するように作成してある。{:recursive, pid}{:enum, pid}というメッセージを受け取ることで初めて並行和の処理を開始する。

メインプロセスのための関数
./parallel_sum/bench/init_sum_bench.exs

# 共通処理を関数化
  def bench_helper(process_info, type_atom) do
    {process_num, calc_processes} = process_info
    # 受信用のプロセスを起動
    # receiver = spawn(ParallelSum.Sum, :receiver, [process_num, self()])
    self_pid = self()
    receiver = spawn(fn -> ParallelSum.Sum.receiver(process_num, self_pid) end)

    # それぞれのプロセスに算出開始のメッセージを送信
    Enum.each(calc_processes, fn pid ->
      send(pid, {type_atom, receiver})
    end)

    # 算出値を受信して合計
    receive do
      {:ok, sum} -> Enum.sum(sum)
    end
  end

立ち上げたプロセス情報(プロセス数, PIDを保持した配列)を受け取り、「算出値の受信用プロセス」を起動させて、「集計処理のために引数経由でチャンクを受け取り起動済みのプロセス」それぞれに対してPIDを指定して集計開始のmessageを送信する。そして、最終的に各プロセスから受け取った結果をまとめたmessageを受信用のプロセスから受け取り、メインプロセスにて値を合計して終了する。

モジュール変数で使用している関数はこちら。大したことはしていないのでコードのリンクを貼っておきます。
github.com

いざ測定

ベンチマークを行行う前に設定条件について説明しておく。配列の要素数100万程度(10Nのrangeで実行)の場合にベンチマークが制限時間内に終了しない問題にぶち当たり、これはコンピューターのスペック的に厳しいと判断。今回のベンチマークの配列の要素数の最大数は10万とし、1プロセスあたりが扱うチャンクの要素数100としたので、並行和を実行するプロセスは最大で1万個生成されたということになる。普通にめっちゃ多くね。

実行したベンチマークはこちら
github.com

設定条件の一覧

項目 設定値
最大配列要素数 100,000
生成する要素値の最大数(int) 1,000
1プロセスあたりの最大要素数 100
最大生成プロセス数 10,000
タスク実行最長許容時間 20seconds

結果

再帰関数を使用して並行和を算出した結果
f:id:takamizawa46:20200306230052p:plain

Enum.sum()を使用して並行和を算出した結果
f:id:takamizawa46:20200306230107p:plain

前回の結果(プロセスを事前立ち上げしないケース)

f:id:takamizawa46:20200223211206p:plain f:id:takamizawa46:20200223211212p:plain

.snapshot

ParallelSum.InitBench parallel: enum 100        500000 2555564
ParallelSum.InitBench  parallel: enum 1000       100000 2902751
ParallelSum.InitBench  parallel: enum 10000      10000  1819477
ParallelSum.InitBench  parallel: enum 100000     1000   1781611
ParallelSum.InitBench  parallel: recursive 100       500000 2517027
ParallelSum.InitBench  parallel: recursive 1000      100000 2878593
ParallelSum.InitBench  parallel: recursive 10000     10000  1834528
ParallelSum.InitBench  parallel: recursive 100000        1000   1710872

数値での比較

前回

算出方法 データ数 1回あたりの実行時間(µs/op)
enum 100 33.83112
enum 1000 349.8089
enum 10000 17106.37
enum 100000 4650929
recursive 100 22.67772
recursive 1000 243.1725
recursive 10000 10893.83
recursive 100000 1575123

今回

算出方法 データ数 1回あたりの実行時間(µs/op)
enum 100 5.111128
enum 1000 29.02751
enum 10000 181.9477
enum 100000 1781.611
recursive 100 5.034054
recursive 1000 28.78593
recursive 10000 183.4528
recursive 100000 1710.872

素数が10万の時での比較

算出方法 前回 今回 何倍速くなったか
enum 4650929 1781.61 約2610倍
recursive 1575123 1710.872 約920倍

考察と感想

確かにアドバイス頂いた通りにElixirではプロセスの起動部分が、並行化におけるかなりのオーバーヘッドになっているのはこの結果からも明らか。それにしても驚いた。こんなに速くなるものとは...。あと気になるのはチャンク分割がどれだけのオーバーヘッドになり得るかということで、事前にプロセスだけを立ち上げておいて、message passingにてチャンクデータを受け取る方式でのベンチマークも非常に気になる。

本来ならば別言語実装の同内容のベンチマークと結果を比較するべきだろうが、並行処理を初めて1年にも満たない人間が、気軽にプロセスを立ち上げて、気軽に並行処理を行えるElixirのストレスフリーなデザインは凄い。

またElixirで並行処理のパフォーマンスを活かすためには下記のような場合に適しているのではないか。

  • 実行時前にプロセスを立ち上げられる設計がされている -> eg: サーバー起動時にプロセスを立ち上げ
  • 事前にプロセスを立ち上げるべき目的になっている -> eg: 常駐プロセス

Elixir(Erlang)の採用事例を数多く見てきたが、GateWayであったり、stream serverであったり、プッシュ通知であったり...と、今回得られた知見とフィットしている。

次はmessage passingによってチャンクデータを受け取る方法でのベンチマークの測定を目指すこととする。

おまけ: なぜTaskが使えなかったのか

まず以下のログからTaskasync()で起動された後に、await()で結果を取得するまでの間に待機状態となるのではなく、適宜処理を実行していることが分かる。

iex(1)> Task.async(fn -> IO.puts("nice boat") end)
nice boat
%Task{
  owner: #PID<0.104.0>,
  pid: #PID<0.106.0>,
  ref: #Reference<0.3871736042.2998665217.74532>
}

これは特に問題ないのだが、致命的な問題が2つある。

Task.await()が実行出来るプロセスの縛り

以下の検証用モジュールを用意してmain_validaterを実行するとerrorになる。どうやら公式ドキュメントにもある通り、Task.await()Task.async()を用いてTaskを生成したPIDを持つプロセス内部でしか実行出来ない様だ。プロセスがリンクされているのだろうか。

When invoked, a new process will be created, linked and monitored by the caller. Once the task action finishes, a message will be sent to the caller with the result.

defmodule ParallelSum.TaskConf do
  @moduledoc """
    Taskモジュールを使用して生成されたプロセスは生成したプロセスによってしか回収できないかを確認
  """

  @doc """
    Taskを生成してmessage passing形式で返す
  """
  def create_task(pid) do
    :timer.sleep(1000)
    task_info = Task.async(fn -> :ok end)
    send(pid, {:ok, task_info})
  end

  @doc """
    自身のpidを用いてTask.awaitが問題なく実行可能かどうか
  """
  def main_validater() do
    pid = self()
    spawn(fn -> create_task(pid) end)
    receive do
      {:ok, task_info} -> Task.await(task_info)
    end
  end
end

実行結果

iex(1)> ParallelSum.TaskConf.main_validater
** (ArgumentError) task %Task{owner: #PID<0.203.0>, pid: #PID<0.204.0>, ref: #Reference<0.1202030419.853016577.25359>} must be queried from the owner but was queried from #PID<0.201.0>
    (elixir) lib/task.ex:594: Task.await/2

つまりは、「俺を生成したownerじゃないと結果が受け取れないで?」ということ。ベンチマークの際に実行するプロセスはモジュール変数をセットするプロセスと異なるプロセスであり、結果の受け取りが不可能なのではないかということに気づいた。

モジュール変数に任意の構造体は格納不可能

Taskは構造体であり、以下の様なデータになる。

%Task{
    owner: #PID<0.164.0>,
    pid: #PID<0.169.0>,
    ref: #Reference<0.1718095132.2719481858.113828>
  }

先ほどのownerもTask構造体の内部に記録されているが、Task.async()の戻り値はTask構造体であるため、モジュール変数に格納することが出来ない。モジュール変数に格納可能なデータ構造は以下のみ。

  • lists
  • tuples
  • maps
  • atoms
  • numbers
  • bitstrings
  • PIDs and remote functions in the format &Mod.fun/arity

「PID」ならいいんや。ということでspawn()によるプロセス生成とmessage passingの方針を採用したというわけになる。

参考文献

【並行処理vs逐次処理】Elixirで実装した並行和と逐次和をベンチマーク測定をして比較した結果【完全敗北】

前回までのお話

「並行コンピューティング技法」という書籍を読み進める中で「並行和」という並行処理にてデータの要素の合計値を求めるアルゴリズムElixirを用いて実装してみた。何とか動くところまで作れたものの、実行速度がどれだけ逐次処理と異なるかは実行してみないと分からない。そこでベンチマーク測定をするためのログを出力すると決めて1週間が経過していた。

前回のお話はこちらから

www.okb-shelf.work

ベンチマーク測定の実装

以前、Yamazakiさんよりご紹介頂いた「Benchfella」を使用しました。ご紹介頂きまして有難うございます。めちゃくちゃ手軽で使いやすかった。

githubREADME.mdに従い、実装を進めていく。
github.com

プロジェクトディレクトリの直下にbenchという名前のディレクトリを作成。合わせて、このディレクトリの直下にベンチマーク用のファイルを用意する。ベンチマーク測定に使用するモジュールを記述するファイルには_bench.exsを必ず付ける。golang_test.goと同じで対象ファイルを探索するために使用していると思われる。

以下のファイルを作成してベンチマークを行う準備をする。
./parallel_sum/bench/sum_bench.exs

defmodule ParallelSum.Bench do
  @moduledoc """
    実行時間などを外部モジュール'benchfella'を用いて計測を行うためのモジュール
  """
  use Benchfella
  @recursive "recursive"
  @enum "enum"
  @dataset_100 ParallelSum.DataCreater.create_N_size_list(100)

  # data size 100 -------------------------------------------
  ## recursive
  bench "parallel: recursive 100" do
    ParallelSum.bench_parallel_exec(@dataset_100, @recursive)
  end

  bench "serial: recursive 100" do
    ParallelSum.bench_serial_exec(@dataset_100, @recursive)
  end

  ## enum
  bench "parallel: enum 100" do
    ParallelSum.bench_parallel_exec(@dataset_100, @enum)
  end

  bench "serial: enum 100" do
    ParallelSum.bench_serial_exec(@dataset_100, @enum)
  end
end

一部を抜粋。ベンチマーク測定に使用する処理の最終値は毎回同じである必要があり、要は検証データは使い回しする必要がある。前回の記事を参照してもらえれば分かるが、今の並行和の実装はpipeline関数内部でランダム値を使用したデータを生成しているため、このままではベンチマーク測定が出来ない。そのため、ベンチマーク用に並行和、逐次和、それぞれに専用の関数を実装した。

./parallel_sum/lib/parallel_sum.ex

defmodule ParallelSum do
  @moduledoc """
    メインモジュール。他モジュールにて抽象化した関数群をpipeline化する
  """
  @doc """
    benchfella用の実行関数(算出結果が異なるとerrorになるため)
  """
  def bench_parallel_exec(_, ""), do: :error
  def bench_parallel_exec([], _), do: :error
  def bench_parallel_exec(dataset, mode) do
    # 立ち上げるプロセス数を算出
    process_num = ParallelSum.Scheduler.calc_total_process(length(dataset))
    # プロセスを立ち上げて並行和を算出
    parallel_sum = ParallelSum.Scheduler.start_task(
      mode,
      ParallelSum.Scheduler.list_spliter(dataset),
      process_num
    )
    Enum.sum(parallel_sum)
  end

  @doc """
    benchfella用の実行関数(算出結果が異なるとerrorになるため)
  """
  def bench_serial_exec(_, ""), do: :error
  def bench_serial_exec([], _), do: :error
  def bench_serial_exec(dataset, mode) do
    _serial_exec(mode).(dataset)
  end
end

これで準備は完了。あとはドキュメントに従ってベンチマークを実行するのみ。

ベンチマークの実行方法について

プロジェクトディレクトリの直下に移動して、以下コマンドを実行。その前に、ファイル変更がある場合はmix compileをお忘れなく。

$ mix bench

Settings:
  duration:      1.0 s

## ParallelSum.Bench
[20:37:04] 1/4: parallel: enum 100
[20:37:07] 2/4: parallel: recursive 100
[20:37:11] 3/4: serial: enum 100
[20:37:13] 4/4: serial: recursive 100

Finished in 10.74 seconds

## ParallelSum.Bench
benchmark name           iterations   average time 
serial: recursive 100       1000000   1.49 µs/op
serial: enum 100            1000000   1.60 µs/op
parallel: recursive 100       50000   57.99 µs/op
parallel: enum 100            50000   60.18 µs/op

終了後の全体ログを出力しているが、実行中には現在の進行度合いがリアルタイムに出力されるため非常に便利。ログの内容もシンプルで分かりやすい。

# ベンチマーク実行までにかかった時間
Settings:
  duration:      1.0 s
# 経過ログ
## ParallelSum.Bench
[20:37:04] 1/4: parallel: enum 100
[20:37:07] 2/4: parallel: recursive 100
[20:37:11] 3/4: serial: enum 100
[20:37:13] 4/4: serial: recursive 100
# ベンチマーク終了後に出力される結果
## ParallelSum.Bench
benchmark name           iterations   average time 
serial: recursive 100       1000000   1.49 µs/op
serial: enum 100            1000000   1.60 µs/op
parallel: recursive 100       50000   57.99 µs/op
parallel: enum 100            50000   60.18 µs/op

合わせて、ベンチマークの結果をbench直下のsnapshotsというディレクトリに自動で保存してくれる。ファイル形式が.snapshopという独自形式だが、ただのテキスト(binary)のようなのでVSCodeでも問題なく閲覧出来た。

./parallel_sum/bench/snapshots/_.snapshots

duration:1.0;mem stats:false;sys mem stats:false
module;test;tags;iterations;elapsed
ParallelSum.Bench   parallel: enum 100      50000   3009104
ParallelSum.Bench   parallel: recursive 100     50000   2899529
ParallelSum.Bench   serial: enum 100        1000000 1596746
ParallelSum.Bench   serial: recursive 100       1000000 1491282

他にもベンチマーク実行前、実行後に行いたい処理を指定したり、複数のベンチマークを比較させたりと対応範囲が広いなぁという印象。詳しくはREADNE.mdを見てください。個人的にアツいと思ったのは.snapshotファイルからグラフを描画できる点。

$ mix bench.graph

Wrote bench/graphs/index.html

f:id:takamizawa46:20200223211558p:plain:w450
(別のベンチマークの結果。ファイルを誤って削除してしまった)

$ open bench/graphs/index.htmlとコマンドを実行してブラウザから生成されたグラフを閲覧することが出来る。凄い。さて、ベンチマークの実行方法も分かったのでベンチマークをデモのものから書き換える。

いざベンチマーク: 逐次処理vs並行処理

少し行数が多くなったのでリンクを貼っておきますが、内容は対したものではなく、ただデータ数と集計方法を変えてベンチマークを実行しているだけ。一般化する良い方法が思いつかなかったので、別にケース分けるならということでヨシ!とした。また、最大要素数は100,000としているが特に理由はなく、何となくそれ以上の要素数にするとベンチマークが終わらないのではないかと思ったからだ。
github.com

いざ、ベンチマークを実行!!!

$ mix bench

Settings:
  duration:      1.0 s

## ParallelSum.Bench
[14:03:33] 1/20: parallel: enum 100
[14:03:35] 2/20: parallel: enum 1000
[14:03:39] 3/20: parallel: enum 10000
[14:03:41] 4/20: parallel: enum 100000
[14:03:45] 5/20: parallel: enum 1000000
[14:09:00] 6/20: parallel: recursive 100
[14:09:02] 7/20: parallel: recursive 1000
[14:09:04] 8/20: parallel: recursive 10000
[14:09:06] 9/20: parallel: recursive 100000
[14:09:07] 10/20: parallel: recursive 1000000
[14:13:30] 11/20: serial: enum 100
[14:13:38] 12/20: serial: enum 1000
[14:13:42] 13/20: serial: enum 10000
[14:13:46] 14/20: serial: enum 100000
[14:13:50] 15/20: serial: enum 1000000
[14:13:54] 16/20: serial: recursive 100
[14:14:03] 17/20: serial: recursive 1000
[14:14:07] 18/20: serial: recursive 10000
[14:14:11] 19/20: serial: recursive 100000
[14:14:15] 20/20: serial: recursive 1000000

Finished in 646.13 seconds

## ParallelSum.Bench
benchmark name               iterations   average time 
serial: enum 100               10000000   0.76 µs/op
serial: recursive 100          10000000   0.80 µs/op
serial: recursive 1000           500000   6.49 µs/op
serial: enum 1000                500000   6.60 µs/op
parallel: recursive 100           50000   22.68 µs/op
parallel: enum 100                50000   33.83 µs/op
serial: recursive 10000           50000   64.08 µs/op
serial: enum 10000                50000   65.76 µs/op
parallel: recursive 1000          10000   243.17 µs/op
parallel: enum 1000               10000   349.81 µs/op
serial: recursive 100000           5000   641.54 µs/op
serial: enum 100000                5000   647.33 µs/op
serial: recursive 1000000           500   6415.18 µs/op
serial: enum 1000000                500   6530.90 µs/op
parallel: recursive 10000           100   10893.83 µs/op
parallel: enum 10000                100   17106.37 µs/op
parallel: recursive 100000            1   1575123.00 µs/op
parallel: enum 100000                 1   4650929.00 µs/op
parallel: recursive 1000000           1   262729181.00 µs/op
parallel: enum 1000000                1   314724083.00 µs/op

めっちゃ時間かかった。このままだと非直感的で分かりにくいのでjupyter notebookPython使ってグラフ化。

並行処理(parallel)

f:id:takamizawa46:20200223211206p:plain

f:id:takamizawa46:20200223211212p:plain

逐次処理(serial)

f:id:takamizawa46:20200223211216p:plain

f:id:takamizawa46:20200223211221p:plain

「ん?全然差がないやん?」と思うかもしれないが実はそうではなく、実際は数値からも見て分かる通りかなり平均処理時間に大きな差がある。単純にPythonmatplotlib.pyplot力がなくてy軸最大値を統一していないのが悪い、というか比較するグラフで単位(10n)が異なるのどうなのかと思うが、並行処理(parallel)の方が10^8であり、逐次処理(serial)の方が10^3になっており、天と地ほどの差がある。2つのグラフを並べて表示させてみたが、数値に差がありすぎて逐次処理の実行時間の値が表示されないという悲しすぎる問題に直面した。

なぜ並行処理の方がこんなにも遅いのか

まずは結果を重く受け止める。正直、データ量がどのあたりで並行処理が逐次処理を上回るかなとワクワクしていたが幻想をぶち壊されてしまった。何がこうも処理を遅くさせているのか。「並行コンピューティング技法」の書籍から得た知識から推測するに「オーバーヘッド」部分がかなりの負担となっていると考えられる。ここでいうオーバーヘッドにあたる処理は以下の3つ。

  • データを要素N個ずつのチャンクに分割
  • プロセスの立ち上げとチャンク割り当て & 合計処理の実行と終了確認 -> プロセスの停止
  • 各プロセスが算出した値をメインプロセスにて合計

チャンク分割が上手く出来ていないのだろうか。
Enum.chunk_every()使ってるだけで記述の雑みの問題はないかと思うが、単純な疑問として100,0000もの要素を持つ配列を分割するのにどれだけの時間が掛かるのだろう。index=0から順に要素を見ていき、index=10になったところで配列を分割。このような再帰処理を行うとすればO(N)の計算量がかかるはず。プロセスの起動と終了に関してもElixirTaskを使っているだけで、改善を見込めそうにない。
現状の手札だけでは打つ手がない。

改善方法について

もっと細かくベンチマークを行い、どこのオーバーヘッドがボトルネックになりうるのかを洗い出すところから始めてみる。今回の成果物としてはベンチマークを実行する方法を身につけた事と、並行処理と逐次処理を比べるという人生初体験が出来たということで、次にその改善をするチャンスを得られたということにしておく。必ず、特定条件以下では並行処理で逐次処理を倒してやる。

参考文献

【第1回: データ分析xブログ】データ分析の力でPV数は増やせるのか。現状を分析して目標を立てるまで

この記事を書いた理由

GoogleアナリティクスGoogleサーチコンソールといった優秀なツールがあり、使い方を解説する記事が多数ある一方で

  • どう使えばPV数が増えるのか
  • 収益を生み出すには何に注目するのか

といったテーマの記事が非常に少ないことに気づいた。現状(2020/02/16時点)で1日の平均PV数が「10」のこのブログがGoogleアナリティクスGoogleサーチコンソールのデータを使用したデータ分析によってPV数が増えていくのかを記録することが一番の目的になる。

ブログの現状

プログラミング関連(特にElixirという言語)の記事をメインに興味のあることをブログを書いている。2019年の3月にブログを開始して、はや1周年を迎えようとしており、書いた記事数は2020/02/16日時点で「89件」。99%が3ヶ月以内にブログを挫折するらしいのでよく続いているなと自分でも思う。ほほとんど「アウトプットしたい」という気持ちだけで続いている。

:
:

と思っていたが、自分のブログのPV数(ページビュー)を最近、気にするようになって落ち込んでいる。

(バァァァァアアアン)
f:id:takamizawa46:20200217224350p:plain

一年もやっててPV数の合計はだいたい3,600程度。1日のPV数は多くて30~40ぐらいで、低い時は5以下といった具合。つまり何が言いたいのかというと「書いた記事をほとんど見てもらえない」ということ。ショック...

この企画について

普通に不労所得ほしいです

どんだけ良い記事を書いていたと思っていても、見てもらえなければ自己満足のメモ。それはそれで良いけど、PV数が少ないということは収益の可能性も低いということ。2020年の抱負は「何かしらでマネタイズする」であり、不労所得を生み出したい。何故、不労所得が欲しいかというと3つの理由がある。

  • 心の余裕を保つため
  • 時間労働の限界性を感じたから
  • 母親に楽をさせてあげたい

せっかくブログをやっていて独自ドメインも取ってるし、はてなもproで契約してるのなら最低限、ブログの維持費分だけでも収益を出したいところ。

収益を生み出すためには

じゃあ、どうやったら収益って増えるのかを考えると最もシンプルな答えは「PV数を増やすこと」になる。アドセンスなりアフェリエイトなりまずは見てもらえる機会を増やすこと。じゃあ、どうやってPV数って増やせるのかを考えると、答えはめちゃくちゃ複雑で明確な解はない。

とはいえ、ブログで収益を生み出している先駆者の方々は存在しており、実際に結果を出している。こういう時は先駆者の考えや施策を調べまくる。どうやら、彼らが言うには収益を増やすために「キーワードの分析」が非常に重要らしい。

  • どんなキーワードがPV数を稼いでいるのか
  • google検索結果の1ページ目を獲得出来ている記事はどれだけあるか

など...

しかしながら、この辺りのノウハウは有料であったり(買えよと言わないで)、絶妙に内部化されており、何となくでしか情報を手に入れることが出来ない。マネをするのも戦略としては当然考えたが、自分のブログのスタイルを変えることはしたくないのでマネできる所だけマネはする。そんなことを考えていてあることに気づいた。

「業務でデータ分析やってるし、勉強ついでに分析もやってマネタイズ出来たら最高じゃね?」

今回の分析で知見を貯めれば、ついでにデータ分析の知見も貯まるし、業務にも還元される上に、お金も貰えるかもしれないと考えるとやらない理由がない ということでこの企画をスタートしてみた。

実践編

分析に使用している全体のコードについてはgithubを参照下さい
github.com

対象のデータについて

自分のドメインを登録済みのGoogleサーチコンソール(通称:サチコ)から検索パフォーマンスのデータを.csv形式でダウンロード。
f:id:takamizawa46:20200217225408p:plain

downloadディレクトリに.csv形式のファイルが生成されているので、さっそく開いてみるが文字化け。
f:id:takamizawa46:20200217225532p:plain

しかもMicrosoft Officeのライセンス切れてて、Excelも使えない。まぁ、元々Excelでやるつもりはなかったので、データ分析のマストアイテムjupyter notebook(いつもの)を立ち上げてダウンロードしたcsvを読み込むことに。

都合が良いので先にカラムの説明を少々。何故かというと日本語表記だと参照する際に面倒なので、英語表記にしているからで「自分のcsvにはそんなカラムはないんですけど」という誤解を生み出さないため。
(左: 元のカラム名, 右: 変換後のカラム名)

  • 検索キーワード -> keyword
  • クリック数 -> click_num
  • 表示回数 -> display_num
  • CTR -> ctr(検索結果として表示された時にどれだけクリックされているかの割合)
  • 掲載順位 -> top_n

pandaspd.read_csv()という関数を使って読み込んだ結果(jupyter notebookの1セルで見えるところだけ)
f:id:takamizawa46:20200217230507p:plain

すでに色々と思うことがあるが、感想としては色んなキーワードで調べられているなぁという程度...。ここから何をすれば良いのやら。順に考えていこう。

データの概要を見てみよう

何から始めて良いのか恐ろしいほど、全く分からないので、まずは定番データの概要を見てみる。pandasdescribe()を使って表示したものがこちら
f:id:takamizawa46:20200217231309p:plain:w450

これだけでも色々なことが分かる。気になったことをリスト出ししてみた。

  • mean: top_nより -> 掲載順位は大体が4ページ目ぐらい -> 1ページ以降を閲覧する割合はかなり低かったのでこれは致命的
  • min: top_nより -> 検索すれば上から2番目(1ページ目)に表示される記事があることに驚きを隠せない
  • 50%: top_nより -> 50%の時点で掲載順位の値が43(すなわち4ページ目)であるため、半数以上の記事は検索からの流入を全く期待出来ない
  • 25%: top_nより -> 25%の記事は悪くても2ページ以内に表示されている様で20記事ぐらいは良い位置を取っている
  • 全体的: click_numより -> めちゃくちゃ低い
  • 全体的: display_numより -> この値から何を感じて良いのか分からない

ネガティブな所に目が行きがちだが、上から2番目に表示されている記事があったり、1ページ目に登場する記事が割とあったりとで素直に驚いた。PV数を稼ぐ上で「どれだけ上位の検索順位をキープできるか」かが重要だと判断出来るので、どのようなキーワードの組み合わせで1ページ目を抑えているのかを次に見ていこう。

1ページを取っているキーワード

その前にtop_nの数値では何ページ目に表示されるのかが直感的に分からないので何ページ目に表示されるかを示すpage_numというカラムを追加して、page_numの値が1になるキーワードだけを抽出した。合わせてctrの値が0以外のデータ(少なくとも1回はクリックされている)と0のデータに分けてみた。

表示されるページが1ページかつ、クリックされたことがあるキーワードの組み合わせ
f:id:takamizawa46:20200220231147p:plain

見たまんまの考察

  • mecabjanome(形態素解析器)の組み合わせ、もしくは「janome + 何か」が強い -> janomeの記事は2件だけなのになんじゃこれ
  • メインでやってるElixir(プログラミング言語)に関係するキーワードがちらほら見つかる
  • 地味にcloud functionが強くて高いctr値を保持している -> cloud fuctionの記事も2つのみだが

表示されるページが1ページかつ、クリックされたことががないキーワードの組み合わせ
f:id:takamizawa46:20200220231244p:plain

見たまんまの考察

  • やはり1ページに登場しているのは「janome, Elixir(elixir), cloud function」に関係するキーワードの組み合わせ
  • elixir const多すぎ。1ページを取ってるのでctrあげたい
  • connpass 勉強会で1ページ取ってるのやばい。ctrあげたい
  • Elixirに関連する入門記事や、基礎知識に関するキーワードが皆無。これはつらい(ほとんどがElixirに関する初級~中級の記事なため)

比較としてpage_numが4以上のキーワードの組み合わせも抽出した。数が多すぎるので部分的に。
f:id:takamizawa46:20200220233651p:plain
f:id:takamizawa46:20200220234243p:plain

見たまんまの考察

  • 訳の分からない単語が多い
  • ビッグネームの単語が多くて、なぜかヒットしてしまいページが大きな数値に
  • とはいいつつも、個人的には落としたくない単語もある(elixir enum mapとか、おそらく海外からの検索だろうけど)

一応page_num=2(2ページ目に表示)に該当するデータも最後に確認。個人的にElixir(elixir)とセットで調べられている単語を知りたいのでElixirを含むキーワードのみを抽出。
f:id:takamizawa46:20200221000042p:plain

見たまんまの考察

  • elixir 並行処理が2ページ目って凄い。ただクリックされていないので残念
  • 他のプログラミング言語 + Elixir(elixir) という組み合わせが多い。「python使いの人のためのElixirの始め方」みたいな記事書くと良さそう
  • コアな単語が少なくElixir(elixir) + syntax が多い。
  • 特に狙っていたキーワードの組み合わせがなくて消化不良感

頻出キーワードからの考察

自分のブログを第3者から見た時(検索した時)にどのようなジャンルを扱っているブログなのかを考察するために、現時点で検索に使用されているキーワードをスペース区切りでsplitしたものをカウントしてみた。結果は降順ソート済みで数多いので、上位20件と下位10件をお見せします。

[No.1]: elixir: 86
[No.2]: python: 74
[No.3]: firebase: 66
[No.4]: functions: 47
[No.5]: cloud: 46
[No.6]: golang: 33
[No.7]: mecab: 31
[No.8]: janome: 22
[No.9]: 形態素解析: 22
[No.10]: deploy: 20
[No.11]: function: 18
[No.12]: authentication: 13
[No.13]: websocket: 13
[No.14]: node: 11
[No.15]: ntt: 11
[No.16]: バイト: 9
[No.17]: login: 8
[No.18]: 並行処理: 8
[No.19]: tokenizer: 7
[No.20]: request: 7
:
[No.442]: okb: 1
[No.443]: 公務員から民間企業に転職した結果: 1
[No.444]: 電話回線: 1
[No.445]: 基礎解析: 1
[No.446]: 電話加入: 1
[No.447]: ipアドレス: 1
[No.448]: 企業: 1
[No.449]: 導線解析: 1
[No.450]: できなくてもいい: 1
[No.451]: 難しい: 1

(誰がokbで調べたんだ...)

見たまんまの考察

今までの考察通り、現在、自分のブログは以下のジャンルに支えられていることが分かる。

Elixir(elixir)が1位になっているのは嬉しいのだけど、このブログにかけている労力はElixir(elixir) > python > janome / mecab, cloud functionの順に大きいが、数字ではほとんど差がないので元々、単語が持っているトレンド性、つまりは人気度が全然違うんだろうなと思う。そこでGoogle Trendsを使って、人気度を比較してみた。ここで出ている数値は「人気度」ということでどのように算出されているかはよく分からない。

f:id:takamizawa46:20200222125032p:plain https://trends.google.co.jp/trends/explore?geo=JP&q=elixir,python

pythonが平均で84という人気度を維持する中、Elixir(elixir)は平均も1、いつでも1...。これは確かに単語のパワーが違いすぎると認めざるを得ない。

これからすべきこと

分析から分かったこと

  • 多くの記事の掲載順位は4ページ目ぐらい で、これは致命的
  • 25%の記事は悪くても2ページ以内に表示されている様で20記事ぐらいは良い位置を取っている
  • 現状のブログを支えているキーワードは「elixir, python, janome mecab, cloud function」でほとんどのPVはこの4つの関連するキーワードの組み合わせで獲得されている
  • 間違いなくキーワードにはパワーの優劣がある。elixirをメインで扱いたいがptyhonには遠く及ばない

反省すべきこと

キーワードの選定を全く意識していなかった。書きたいものを書くだけではPVが稼げないのは数字から分かる。残念ながらElixir(elixir)というトピックだけではPVを稼ぐことは難しいため、何かしらの戦略が必要になってくるが脳筋で「Elixir(elixir)かつコアな記事」を書き続けていたことを反省。それもそれでブログとしてはありだけど、それだけではなく、流入を行うための記事も必要になる。

掲げた戦略

  • Elixir(elixir)という単語だけではパワーに欠けるため、検索キーワードにも見られた「他プログラミング言語 x Elixir(elixir)」という分野を狙っていく(eg: python好きのためのElixir(elixir)入門みたいな)
  • PVを稼いでいると思われる記事が何となく分かったので検索されているキーワードの組み合わせの形に合うようにリライトを行う
  • パワーのある単語で1ページ目を狙う(eg: 「プログラミング 独学」, 「プログラミング 挫折」など)

次回について

今回の分析で分かったことと、掲げた戦略に対する取り組みを行い、どれだけPV数が変化するのかを次回の記事にて報告しようと思います。自分自身、今まで学んだ分析に合わせて、トライアンドエラーで得られる知見が凄い楽しみかつ、PV数が増えるかもしれないという明確な報酬があるため気合いが入りまくっています。

では、第1回の結果をお楽しみに。

【第17回清流elixir勉強会】Elixirを用いた並行和アルゴリズムの実装

トピック

あけましておめでとうございます。1月2月は色々と忙しくて、全く清流elixirにて勉強会を開催できず...
2ヶ月ぶりの開催となりましたが常連の方、新規の方、リモートでの接続、多くの方に参加して頂けました。ありがとうございました。ハッピーターンを持ち合わせていったのですが気づいたら自分が半分以上食べてしまっていました。忍びない..

elixir-sr.connpass.com

清流elixir-infomation
開催場所: 丸の内(愛知)
参加人数: 3 -> 6 update!
コミュニティ参加人数 : 37 -> 40
2020/02/14現在

第17の勉強会について

今回は新年最初の勉強会であったため、メインテーマがお恥ずかしながら未定であったため、「もくもく会」という形で勉強会を開催しました。各々、自身の取り組みたい内容について約2時間、取り組みました。僕は最近読み始めた「並行コンピューティング技法」の書籍で第6章で紹介されている「並行和」というアルゴリズムの実装をElixirで挑戦してみました。書籍にはOpenMPとPthreadでのコードが記載されているのですが中々のボリュームな上、OpenMPやPthreadは計算機科学の分野で広く仕様されるライブラリため親しみが全くありません。これを「Elixirで書き換えたらどんなもんやろなぁ」という単純な興味からのスタートです。

www.okb-shelf.work

実装したコードはこちら
github.com

並行和について

詳しくは「並行コンピューティング技法 第6章のまとめ」という記事を後日出し、そちらで解説を行うが、ざっくりとした説明をしておく。何かしらのデータが手元に存在しているとして、それはN個の要素を持つ配列だと考える。このN個の要素を持つ配列をM個の間隔でデータを分割してチャンクを作成する。作成したチャンクをチャンク数分だけ用意したスレッド、もしくはプロセスの内部で配列の要素を合計した数値を算出する。そして、それぞれのスレッド、もしくはプロセスによって合計された値をメインのスレッド、もしくはプロセスにて集約する。この一連の処理を「並行和」と言っている。

# N個(今回はN=10)の要素を持つ配列を用意
[1,2,3,4,5,6,7,8,9,10]

# 用意したデータをM(今回はM=2)の間隔で分割してチャンクを作成
[
  [1,2],
  [3,4],
  [5,6],
  [7,8],
  [9,10]
]

# 作成したチャンクをそれぞれのスレッド、もしくはプロセスに割り当てて合計値を算出する
[
  [1+2],
  [3+4],
  [5+6],
  [7+8],
  [9+10],
]

# それぞれのスレッド、もしくはプロセスによって算出された合計値をメインのスレッド、もしくはプロセスにて合計
[3 + 7 + 11 + 15 + 19] -> 55

この一連の処理をElixirを用いて作成していく。

暗黙の仕様について

手元に丁度、頃合いのデータがないこともあり、作成するデータの性質と自動でスケールさせるかについて実装前に決めておく。以下の仕様(といっても、作成するのは自分だけなので暗黙の仕様ということで)を実装にあたり遵守する。

  • データ分割による並行化を行い、データのサイズによって自動でスケール(起動させるプロセス数をコントロール)するようにする
  • データはN個の要素を持つ配列を採用。要素はint型の上限値を定めて生成するランダム値

この勉強会の2時間の間で何とか動く所までは実装することが出来た。といっても、並行処理に関してはElixirは実装されているモジュールが優秀なため、ほとんど自前で作成したものはなく、ほとんどがwrap関数とhelper関数になった。さすがのElixir

並行処理の起動までの流れ

実装についてはgithubの方を参照して頂ければ良いので全体の処理の流れを解説しておく。

  • N個のランダム値の要素を持つ配列を用意
  • M個の間隔でチャンクに分割
  • 分割したチャンクをプロセスの専属データとして、プロセスを新規で立ち上げる
  • それぞれのプロセスにて合計値(sum)を算出
  • メインのプロセスにて各プロセスの合計値を合計

この一連の処理をまとめているのが以下のファイル
./parallel_sum_ex/lib/parallel_sum.ex

defmodule ParallelSum do
  @moduledoc """
    メインモジュール。他モジュールにて抽象化した関数群をpipeline化する
  """

  @doc """
    並行処理によって合計値を算出するpipeline関数
  """
  def parallel_exec(_, ""), do: :error
  def parallel_exec([], _), do: :error
  def parallel_exec(data_size, mode)  do
    # ランダム値を要素に持つリストを分割したチャンクデータを作成
    dataset =
      ParallelSum.DataCreater.create_N_size_list(data_size)
      |> ParallelSum.Scheduler.list_spliter()

    # 立ち上げるプロセス数を算出
    process_num = ParallelSum.Scheduler.calc_total_process(data_size)

    # プロセスを立ち上げて並行和を算出
    parallel_sum = ParallelSum.Scheduler.start_task(mode, dataset, process_num)
    Enum.sum(parallel_sum)
  end
end

別ファイルにて定義したモジュール内関数を利用して先ほどの流れを実行していく。自動スケールについてだが、スケール、すなわち立ち上げるプロセス数は全データのサイズをconfigにて設定した1プロセスが扱える最大要素数の数によって決定される。算出式はめちゃくちゃシンプルで全データ数を1プロセセスが扱える最大要素数で割るだけ。仮にデータ数が1000であり、最大要素数が100であれば10個のプロセスを立ち上げることになる。

./parallel_sum_ex/lib/scheduler.ex

defmodule ParallelSum.Scheduler do
  @moduledoc """
    データのチャンク分割とプロセスの起動の一連のスケジューリングを実行するための関数
    他プロセスによるタスク実行はmessage passingでやると手間なのでTaskに任せる
  """

  @doc """
    configに設定された1chunkが扱える最大要素数を元に立ち上げるべきプロセスの総数を算出する
    processs_num = 配列の要素数 / 1チャンク辺りの扱える要素の最大値
  """
  def calc_total_process(data_size) do
    each_chunk_size_max = ParallelSum.Config.each_chunk_list_size_limit()
    div(data_size, each_chunk_size_max)
  end
end

少しだけテクいことをしているとすれば、Taskモジュールを仕様したタスクを持つプロセスの作成と実行結果の受け取り部分。過去に何度も同じことをやったことがあるが、Task.async()に実行する無名関数を渡して、生成されたプロセスのpidを要素に持つリストを作成する。パイプライン演算子を仕様して、立ち上げたプロセスの実行結果をTask.await()とすることで受け取っていく。割と頻出の処理なので新規性は特にない(はず)。

Taskの使い方は過去の記事にまとめているのでご参照ください。

www.okb-shelf.work

再帰関数とEnum.sum()を使った際でパフォーマンスに差が出来るのかを後々、測定したいので第1引数に対してパターンマッチを行い、実行する無名関数を変化させている。あいかわらずパターンマッチ半端ないって。
./parallel_sum_ex/lib/scheduler.ex

defmodule ParallelSum.Scheduler do
  @moduledoc """
    データのチャンク分割とプロセスの起動の一連のスケジューリングを実行するための関数
    他プロセスによるタスク実行はmessage passingでやると手間なのでTaskに任せる
  """

  @doc """
    分割されたデータと算出された立ち上げるプロセス数を元にTask moduleを起動して再帰関数によって集計処理を実行
    各プロセスが算出した合計値がリストになって返ってくる
  """
  def start_task(_, lst, _) when length(lst) == 0, do: :error
  def start_task(_, _, 0), do: :error
  def start_task("", _, _), do: :error

  def start_task("recursive", splited_lst, process_num) do
    # プロセスを起動し、チャンクを割り当て
    Enum.map(1..process_num, fn index ->
      Task.async(fn -> ParallelSum.Sum.recurcive_sum(Enum.at(splited_lst, index-1)) end)
    end)
    |> Enum.map(fn task -> Task.await(task) end)
  end

  def start_task("enum", splited_lst, process_num) do
    # プロセスを起動し、チャンクを割り当て
    Enum.map(1..process_num, fn index ->
      Task.async(fn -> ParallelSum.Sum.sum(Enum.at(splited_lst, index-1)) end)
    end)
    |> Enum.map(fn task -> Task.await(task) end)
  end
end

Usage

今回は実行結果を確認する部分までしか実装できなかった。本当はメモリ使用率や経過時間なども出力して、グラフ化して色々と満たされたいのだがご愛嬌。実行にはElixirがインストール済みである必要がある。

# iexを立ち上げる
> $ iex -S mix


# N -> 配列の要素数
# mode -> sumの集計方法(recurcive -> 再帰処理, enum -> Enum.sum() のどちらか)

# 並行和(複数プロセスによる算出)
iex> ParallelSum.parallel_exec(N, mode)

# 逐次和(単一プロセスによる算出)
iex> ParallelSum.serial_exec(N, mode)

実行結果

iex(1)> ParallelSum.parallel_exec(100_000, "recursive")
50084347

iex(2)> ParallelSum.parallel_exec(100_000, "enum")     
50113202

iex(3)> ParallelSum.serial_exec(100_000, "recursive")
50099269

iex(4)> ParallelSum.serial_exec(100_000, "enum")     
50106982

実装して気づいたけど、明らかに逐次処理の方が速い。何でだろう...🤔
プロセスの立ち上げやデータ参照等がオーバーヘッドになっているということだろうけど厳密な検証は実行時間などを確認出来る様にしてから行なっていこう

参加者の方々の成果物

確認できるものを一部、参照させて頂きます。

kikuyutaさん

自分はハードウェアはさっぱりですが、内部処理の仕組みについて教えて頂きました。

GKBR56さん
ElixirでMMORPGを作られている方です。youtubeで実際に動くところを確認することが出来ます。オンラインゲームの仕組みがどうなっているか自分はさっぱりですが、最初はシンプルなechoサーバーを作るところから始めたと伺いました。

https://www.youtube.com/watch?v=nwxHhOcTHBw


mmo.ex デモ動画

近日、新たな動画を公開予定だそうです。Qiitaの記事は以下を参照下さい
qiita.com

【並行コンピューティング技法】第3章のまとめ

前回までのあらすじ

実際に並行処理を記述する際にどのように手法を決めて実装していくのかという話が第2章のメインであった。 並行処理の方針を決める手法は以下の2通りで、第2章ではそれぞれの特徴やマナー、サンプルに触れながら内容が進んでいく。

  • タスク分解
  • データ分解(データをチャンクに分解)

加えて、筆者が並行処理を作成する際に、何を重要としているか(1にスケーラビリティ, 2にパフォーマンス)という話も登場した。詳細は、前回の記事をご参照下さい。

www.okb-shelf.work

第3章

まずは、いつものように見出しからどんな情報が得られるかを観察。第3章は大きく以下のような構成になっている。

  • 並行処理(並行アルゴリズム)の検証方法 -> どのように連続的整合性を確認するのか
  • 実際に2つのスレッドを用いたアルゴリズムをベースに筆者と共に検証(4段階)
  • 並行化した際の指標

ご覧の通り、第3章は作成した並行処理に対してどのように検証作業を行なっていくのかがメイントピックになっており、並行処理を作成した後の話になる。しかし、並行処理を記述する際にも、第3章で登場する考え方(一般化方法)は非常に重要であり、「ちゃんと読んでね」という筆者のメッセージが込められているため、飛ばさずに読む。

並行処理の検証について

M.Ben-Ariという強すぎる方が記した著書「Principles of Concurrent and Distributed Programming」で並行処理、および分散プログラミングが仕様通りに動いているかを一般化(誰がやっても同じようになるように)したそうで、この一般化方法が並行アルゴリズムを設計する上で最も大切になるとのこと(絶対に後悔させないから読んでくれと熱いメッセージが記述されている)。まずはこのBen-Ariが提案した一般化方法を理解することからこの章は始まる

Ben-Ariの並行アルゴリズムの一般化

「うわー、理論的で絶対に難しいやろなぁ」と思いつつ読み進めてみると人生初遭遇の単語が多いが、書かれている内容は意外とシンプルで慌てる必要はなさそう。順に見ていく。

プログラムはアトミックな実行文の連続である

その通りだとしか言いようがない。アトミックというのは原子を英語でアトムというので、処理を構成する部品のことだと思えばいい。本書によると、この粒感はプログラムコードの1行に当たるものであったり、アセンブリ言語で記述された1行であったりと見方によって変化するようだが、並行アルゴリズムを設計する際には高級言語で記述するコードの1行だと考えるのが良いのだそう。

どんな複雑な処理であっても、コードを1つ1つ分解していけば、1行のプログラムコードの連続になるという主張になる

name = "OKB" # アトミック
print(name) # アトミック

if name != "OKB": # アトミック
  print("Who are you???") # アトミック
インタリーブ(意味: 相互に挟み込む)

並行プログラムは必ずしも複数のスレッドが毎回、同じ順番で実行されることはなく、実行する度に処理される順序や処理時間は変化する。しかし、毎回実行する度に結果が異なるというのは冪等性に欠けており並行プログラムとしては欠陥品と言える。この複数のスレッドの実行文がそれぞれ、実行される順序の組み合わせのことを「インタリーブ」と呼び、並行プログラムは実行されうる全てのインタリーブ(組み合わせ)で同じ結果になることを証明する必要がある

インタリーブの数はスレッド数が増えると指数関数的に増えるため、全てのインタリーブに注目するのではなく、連続的整合性を証明するために必要な部分のみを取捨選択することが重要になる。

軽くPythonでサンプルを記述してみた。以下の処理は買い物カゴに関数funcAが引数に渡されてきた値を追加し、関数funcBが先頭要素を取り出し、返すという極シンプルなもの。逐次処理でfuncA -> funcBという順に呼び出されるのであれば問題はなさそうだが、並行プログラムとして関数それぞれをスレッドで動作させるとどうだろうか。

# 買い物カゴ
cart = []

# スレッドAに実行される関数
def funcA(goods):
  print("A was start")
  cart.append(goods)
  
# スレッドBに実行される関数
def funcB():
  print("B was start")
  # 買い物カゴから先頭要素をpop
  goods = cart.pop(0)
  return goods

# 以下のように逐次実行されるなら問題はないが...  
funcA("orange")
print(cart)
funcB()
print(cart)

インタリーブを考えると以下のようになる。print()の処理はどんな順で呼び出されても今の所は問題ないだろうからインタリーブからは外しておく。

  • case: 1
    • A: カートに値を追加
    • B: カートから値を取得
  • case: 2
    • B: カートから値を取得
    • A: カートに値を追加

case: 2の場合はリストに値が存在しないにも関わらずpop()を実行することになるため、実行時errorになる。これは先ほど述べた「実行されうる全てのインタリーブで同じ結果になる」という証明が出来ていないため、並行プログラムとしては欠陥品だと言える

Traceback (most recent call last):
  File "Main.py", line 17, in <module>
    funcB()
  File "Main.py", line 12, in funcB
    goods = cart.pop(0)
IndexError: pop from empty list
Ben-Ariの4つの並行実行一般化まとめのまとめ

本書にあるものを噛み砕いて記述しておく。情報を整理して覚えやすくする。

  • プログラムは連続したアトミックな実行文
  • 並行プログラムは複数スレッドが実行する処理の組み合わせ(アトミックのインタリーブと本書に有り)
  • 並行プログラムは実行されうる全てのインタリーブで結果が同じになるように
  • どのインタリーブでも他スレッドの実行を妨げてはならない -> 他スレッドの処理を停止させたり、実行を占有してはならない

これでBen-Ariの並行アルゴリズムの一般化方法に関する知識を身につけたことになる。次にBen-Ariの一般化方法をどのように使用するのかという話になっていく。

例題: クリティカルセクション問題

並行アルゴリズムで共有変数を参照/更新する部分をクリティカルセクションと言い、並行アルゴリズムを設計する上で良い例題となるらしい(著者はクリティカルリージョンと言うのが好みの様)。クリティカルセクションでは以下2つの性質を満たす必要がある。

守るべき性質

  • 排他制御(アクセス可能なスレッドは1つのみ)
    • 他スレッドが共有変数を参照/更新 中は共有変数へのアクセスを禁止される。
  • 他スレッドが他スレッドの共有変数へのアクセスを妨げてはならない

簡略化のためスレッドは2つで固定(スレッドzeroとスレッドone)してインタリーブを考える。コードをそのまま転載するのはアレなので、Pythonでそれっぽいものに書き換えたものを載せておく。
以下のコードはそのまま実行すると自動で終了しないので注意して下さい!!

第1段階
thread_number = 0 # 実行中のスレッド番号

def thread_zero():
  while True:
    # spin wait(待機し続ける)
    while(thread_number == 1):
      pass
    # 共有変数へのアクセス
    critical_region_zero()
    # 実行スレッドを0から1に切り替え
    thread_number = 1
    # 何かしらの処理
    other_stuff_zero()
  

def thread_one():
  while True:
    # spin wait(待機し続ける)
    while(thread_number == 0):
      pass
    # 共有変数へのアクセス
    critical_region_one()
    # 実行スレッドを1から0に切り替え
    thread_number = 0
    # 何かしらの処理
    other_stuff_one()

それぞれのスレッドが実行される度にthread_numberの値を参照して、自分に実行権があるのかを確認する。実行権があればクリティカルリージョンへのアクセスを行う。アクセスが終了した際にthread_numberをzeroなら1に、oneなら0に切り替える。これで問題なく排他制御がされるはず。

しかしながら、これは互いのスレッドが動作していることが前提であり片方のスレッドが何かしらの原因で停止した際には、もう片方のスレッドはthread_numberの値が変わることを期待して待機し続けてしまい結果的にデッドロックが発生してしまう。

第2段階
thread_0_inside = 0 # zeroがクリティカルリージョン中か
thread_1_inside = 0 # oneがクリティカルリージョン中か

def thread_zero():
  while True:
    while(thread_1_inside):
      pass
    # クリティカルリージョンへアクセス中ということを表明
    thread_0_inside = 1
    # 共有変数へのアクセス
    critical_region_zero()
    # クリティカルリージョンへのアクセスが終了したことを表明
    thread_0_inside = 0
    # 何かしらの処理
    other_stuff_zero()
  

def thread_one():
  while True:
    while(thread_0_inside):
      pass
    # クリティカルリージョンへアクセス中ということを表明
    thread_1_inside = 1
    # 共有変数へのアクセス
    critical_region_one()
    # クリティカルリージョンへのアクセスが終了したことを表明
    thread_1_inside = 0
    # 何かしらの処理
    other_stuff_one()

第1段階で課題となった「他方のスレッドが停止した場合にデッドロックが発生する」という問題に対応するために、他方のスレッドがクリティカルリージョンへアクセスしている際には自身の処理をスピンウェイトするという処理に更新された。こうすることで他方が何かしらの原因で停止したとしてもデッドロックは発生しなくなる。しかしながら、この方法には大きな問題がある。thread_0_insideの値、もしくはthread_1_insideの値を1に更新する最中に、他方のスレッドがクリティカルリージョンにアクセスしてしまう可能性がある。これは排他制御が機能していないことを意味している。
このようにそれぞれのスレッドが「アクセスするで」という意思を表明した後に、すぐにクリティカルリージョンにアクセスするような解法を著者は「自分勝手なスレッド」と言うそう。

第3段階
thread_0_wants_enter = 0 # zeroがクリティカルリージョンへのアクセスを依頼中か
thread_1_wants_enter = 0 # oneがクリティカルリージョンへのアクセスを依頼中か

def thread_zero():
  while True:
    # zeroがクリティカルリージョンへのアクセスをしようとしていることを表明
    thread_0_wants_enter = 1
    while(thread_1_wants_enter):
      pass
    # 共有変数へのアクセス
    critical_region_zero()
    # zeroがクリティカルリージョンへのアクセスが終了したことを表明
    thread_0_wants_enter = 0
    # 何かしらの処理
    other_stuff_zero()

def thread_one():
  while True:
    # oneがクリティカルリージョンへのアクセスをしようとしていることを表明
    thread_1_wants_enter = 1
    while(thread_0_wants_enter):
      pass
    # 共有変数へのアクセス
    critical_region_one()
    # oneがクリティカルリージョンへのアクセスが終了したことを表明
    thread_1_wants_enter = 0
    # 何かしらの処理
    other_stuff_one()

先ほどの自分勝手なスレッドを改修して、第3段階ではクリティカルリージョンへのアクセスの意思を表明してから、他方のスレッドがアクセスしようとしていないかを確認するようになっている。こうすることで同時にクリティカルリージョンへアクセスする問題は発生しなくなり、排他制御が正常に行われると判断出来ると思いきや、先ほどと同じ様にデッドロックが発生しうる。
thread_0_wants_enterthread_1_wants_enterがほぼ同時に1に更新された時に、お互いに値が0になるまでスピンウェイトする。これはいつまでたっても終わることがない。互いに「どうぞどうぞ」と道を永遠に譲り続ける状態となってしまう。

第4段階
import time
import random

thread_0_wants_enter = 0 # zeroがクリティカルリージョンへのアクセスを依頼中か
thread_1_wants_enter = 0 # oneがクリティカルリージョンへのアクセスを依頼中か

def thread_zero():
  while True:
    # zeroがクリティカルリージョンへのアクセスをしようとしていることを表明
    thread_0_wants_enter = 1
    while(thread_1_wants_enter):
      # アクセスしたい表明を一旦取り下げる
      thread_0_wants_enter = 0
      # ランダム秒だけ待機
      time.sleep(random.uniform())
      # 再度、アクセスを表明
      thread_0_wants_enter = 1
      
    # 共有変数へのアクセス
    critical_region_zero()
    # zeroがクリティカルリージョンへのアクセスが終了したことを表明
    thread_0_wants_enter = 0
    # 何かしらの処理
    other_stuff_zero()

def thread_one():
  while True:
    # oneがクリティカルリージョンへのアクセスをしようとしていることを表明
    thread_1_wants_enter = 1
    while(thread_0_wants_enter):
      # アクセスしたい表明を一旦取り下げる
      thread_1_wants_enter = 0
      # ランダム秒だけ待機
      time.sleep(random.uniform())
      # 再度、アクセスを表明
      thread_1_wants_enter = 1

    # 共有変数へのアクセス
    critical_region_one()
    # oneがクリティカルリージョンへのアクセスが終了したことを表明
    thread_1_wants_enter = 0
    # 何かしらの処理
    other_stuff_one()

第3段階の単純なスピンウェイトを改めて、すでに他方のスレッドがクリティカルリージョンへアクセスした場合に自身がアクセスしたいという表明(thread_0_wants_enterもしくはthread_1_wants_enterを0に)を取り下げて、ランダム秒だけ待機する。2つの乱数が一致することが無い限り、お互いに道を譲り合うような状態は発生することが無くなり、デッドロックは起こり得ない。 お、完璧やんと思いつつ、実はまだ問題がある。
例えば、zeroのスレッドが処理を終了しthread_0_wants_enterの値を0に戻す。次に新たなスレッドがクリティカルリージョンへのアクセスを依頼する。依頼してきたのはzeroのスレッド。thread_1_wants_enterの値を確認し、0であれば(0と仮定)、thread_0_wants_enterを1に更新して、クリティカルリージョンへのアクセスを再度行う。アクセスが終了したタイミングでthread_0_wants_enterを0に更新する。再度、新たなスレッドがクリティカルリージョンへのアクセスを依頼する。依頼してきたのはzeroのスレッド...

このようにどちらかのスレッドが永遠にアクセス権限を独占してしまう可能性があるということが記述されている。この現象をスターベーション(枯渇)と言う。なるほど。この問題を解消するためにDekkerのアルゴリズムを実装するが、その前にデッドロックが発生する4条件について触れておく。

デッドロックを発生させる4つの条件

左側が本書から引用した言葉、右側が意味を噛み砕いた文章。

  • 1.相互排他条件 -> 同時にいくつかのスレッドがアクセス可能。もしくは1スレッドのみがアクセス中
  • 2.獲得後のウエイト -> アクセスを終了したスレッドが再度、続けてアクセスをしようとする
  • 3.プリエンプトなし(先取する) -> 参照された値を削除して良いのは参照したスレッドのみ(値の解放)
  • 4.循環待ち -> すでに他スレッドで参照されている値を参照しようとする

1, 2は何となくアカンのやろなぁと分かるが、3, 4に関しては場面想定が上手く出来ないが、鍵を持って箱を開けて得た中身の処理は開けた本人(スレッド)に責任を要求するという様に考えておけばいいだろう。

Dekkerのアルゴリズム

先ほど第4段階で実装したコードをDekkerアルゴリズムの形に改修する。

auth = 0 # クリティカルリージョンへアクセス可能なスレッド番号(スイッチ)
thread_0_wants_enter = 0 # zeroがクリティカルリージョンへのアクセスを依頼中か
thread_1_wants_enter = 0 # oneがクリティカルリージョンへのアクセスを依頼中か

def thread_zero():
  while True:
    # zeroがクリティカルリージョンへのアクセスをしようとしていることを表明
    thread_0_wants_enter = 1
    while(thread_1_wants_enter):
      # アクセス権限があるかを確認
      if auth == 1:
        # アクセスしたい表明を一旦取り下げる
        thread_0_wants_enter = 0
        # アクセス権限がなければ待機
        while auth == 1:
          pass
        # 再度、アクセスを表明
        thread_0_wants_enter = 1

    # 共有変数へのアクセス
    critical_region_zero()
    # アクセス権限を譲る
    auth = 1
    # zeroがクリティカルリージョンへのアクセスが終了したことを表明
    thread_0_wants_enter = 0
    # 何かしらの処理
    other_stuff_zero()

def thread_one():
  while True:
    # oneがクリティカルリージョンへのアクセスをしようとしていることを表明
    thread_1_wants_enter = 1
    while(thread_0_wants_enter):
      # アクセス権限があるかを確認
      if auth == 0:
        # アクセスしたい表明を一旦取り下げる
        thread_1_wants_enter = 1
        # アクセス権限がなければ待機
        while auth == 0:
          pass
        # 再度、アクセスを表明
        thread_1_wants_enter = 1
      
    # 共有変数へのアクセス
    critical_region_one()
    # アクセス権限を譲る
    auth = 0
    # oneがクリティカルリージョンへのアクセスが終了したことを表明
    thread_1_wants_enter = 0
    # 何かしらの処理
    other_stuff_one()

構造としてはクリティカルリージョンにアクセス可能なスレッドをauth(本書にはfavoredとあり)という変数によって制御する。このauthはいわば、クリティカルリージョンへアクセスするための優先権となる。どちらかのスレッドがクリティカルリージョンへアクセスしようとしても、アクセスするための優先権がなければ、アクセスすることは出来ない。そのため、先ほどのように何度も同じスレッドがクリティカルリージョンへアクセスするという現象を回避することが出来る。より詳細なインタリーブは本書を読んでみてほしい。

性能評価

いたって単純。算出式の妥当性だとか、どうしてこの式が定義されているのかは追求しない。賢い人に任せる。 評価の軸は以下の2つ。

  • 高速化率
  • 実行効率
高速化率

「従来の逐次処理に対して、並行化したこの処理は300%早くなりました!」と言われるよりも「従来の逐次処理の3倍早くなりました」と言われた方が直感的で理解し易い。高速化率とは元の逐次処理が並行化したことで何倍早くなったのかを算出するための指標値になる。高速化率を算出するためには逐次処理の実行時間数値が必要になるので注意が必要。理想値はコア数に比例して高速化率も増えていくような数値になる。(コア数が10倍 -> 高速化率も10)

算出式は以下の2つが紹介されており、2つの算出式の違いはGustafson-Barsisの法則ではコア数が増えるにつれて、データ量の増加しているという前提が含まれているという点で算出方法が異なる。

Amdahlの法則: Amdahl's law - Wikipedia
Gustafson-Barsisの法則: Gustafson's law - Wikipedia

実行効率

高速化率をコア数で割れば算出可能。どれだけコンピューターリソースを上手く使えたかという指標になる。仮に90%という値が算出されたのであれば、残りの10%は処理全体を平均して、全てのコアが10%はアイドルになっていたということになる。この10%を減らすことが出来れば、完璧な効率化が出来たといえるが、実際になぜ実行効率の値が低いのかと言う原因は要因が複雑すぎるため、厳密な考察が必要だそう。

以上

参考文献