【レポート】第11回清流elixir勉強会を開催しました【Taskを使った並行処理に入門】

トピック

今回で第11回目の勉強会を開催致しました
elixir-sr.connpass.com

隔週にて勉強会を開催していたのですが、先月は仕事の都合で日程が合わずで3週間時間が空いてしまった
こういうのは一回サボる癖が付いてしまうと徐々に習慣化してしまうので気をつける様にしなければと反省
仕事って大変だ

最近は毎回参加して頂ける方、新規の参加者の方も増え、8人用のレンタルオフィスが手狭に感じるようになった
主催者側としてはより快適な場所を提供したいという気持ちもあるがこのローカル、アングラ感が気に入っていたりもする

さて今回はついにElixirの強力な並行処理について入門していくことになった

清流elixir-infomation
開催場所: 丸の内(愛知)
参加人数: 5 -> 6 update!
コミュニティ参加人数 : 13 -> 15 update!
20190908現在

第11回の勉強会の内容について

Elixirでの並行処理について話を始める前に、まず並行処理というものが何なのか、よく耳にする並列処理と何が違うのかという部分からスタートした

ざっくりと並行処理と並列処理について

技術が進化し、現代のCPUはマルチコアであることが一般的だ
ここでいうコアとはCPUの中に、CPUと同じ様な演算が行えるものが入っていると思えばいい(厳密にはCPUとイコールではない)
つまりはCPU in CPUという状態でたくさんの処理を行うことが可能になるというわけだ
よく聞くデュアルコアとは2つのコアを持つCPUで、クアッドコアというのも最近よく聞くようになった

そして我々がyoutubeを見たり、iTunesを使って音楽を聞いたりという操作をPCで行うわけだが、この際にそれぞれの仕事(task)はプロセスという単位で管理がされている
youtubeを見るためにブラウザを立ち上げることでブラウザを管理するプロセスが立ち上がる。iTunesも同様だ
このプロセスというものをどの様に扱う(動かす)かで並列処理なのか並行処理なのかが分かれる

並行処理について

1つのコア上で複数のプロセスを動作させることをいう。イメージとしては同じキッチンで切る、煮る、焼く、揚げるなどを1人が行なっていると思ってもらえれば分かりやすいのではないかと思う
あまりにも高速で動いているため、それぞれの動作が同時に行われているように見えるが、実際には高速で行う処理を切り替えている
この切り替えをコンテキストスイッチと呼ぶ(詳しくはggってね)

Elixirで並行処理を行うにはプロセス間でやり取りを行えば良い
よく取り上げられるElixirの強力な並行性というのは並行処理を指している場合が多いのではないだろうか
この部分は非常に混同しやすいので自身も気をつけている

  • process間通信(messageの送受信)
  • Task
  • Agent etc...

並列処理について

こちらは複数のコアでそれぞれ1つのプロセスを割り当てて処理することをいう。イメージとしては複数のキッチンそれぞれで1品の料理を調理すると思ってもらえれば分かりやすいのではないかと思う
性能という数の暴力とでもいえばかなり効率が良さそうだ

Elixirで並列処理を行うためには以下を使用するらしいが、私自身こちらにまだ触れていないため間違いがあれば指摘頂きたい

  • OTP: behaviourの1つ GenStage
  • 外部ライブラリ: Flow

Elixirでのプロセスの扱いについて

通常プロセスを生成する祭にはOSのAPIを使用して管理を行うが、Elixirの場合はErlang実行環境である、EVM(Erlang Virtual Machine)上で独自のプロセスを生成する(EVMで生成されるプロセスの実態はCの構造体)
このプロセスはそれぞれが個別にメモリ空間を確保しており、ヒープやスタックなどとCPUを使用する権限を持ち合わせている
そしてCPU使用の権限はプロセス毎に移り変わっていく

そうすることでCPUを独占してしまうような再帰処理を防ぐことが出来るし、メモリ空間をプロセス毎に持っているため、ガベージコレクタをプロセス単位で実行することが可能である

Taskについて

f:id:takamizawa46:20190908130053j:plain
Elixirには気軽にプロセスを生成して並行処理を行うためのTaskというモジュールが実装されている
つまりはこのTask自身もプロセスということになる(元のプロセスから生成される新たなプロセス)

最も気軽にTaskで並行処理をするための関数にTask.asyncというものがある
Task.asyncは非同期処理を行うため、実行結果を取得するためにawaitを行う
Task.async, Task.awaitの詳しい記述方法については公式ドキュメントを参照してもらうか、私が以前かいた記事を見て頂ければと思う

www.okb-shelf.work

まずはシンプルに受け取った数値に+2をする関数をTaskを使って並行処理で実行させてみる

# 無名関数の作成
iex(1)> func_ = fn num -> num + 2 end
#Function<7.91303403/1 in :erl_eval.expr/5>

# 作成した無名関数をasyncに渡す(fn -> end)の形でないとerrorになるため注意
iex(2)> task = Task.async(fn -> func_.(10) end)
%Task{
  owner: #PID<0.352.0>, # 生成元のプロセス認識子
  pid: #PID<0.2140.0>, # 生成されたのプロセス認識子
  ref: #Reference<0.3658657072.3843293185.212981>
}

# 実行結果を受け取る
iex(3)> Task.await(task)
12

# おまけ: 生成元のプロセス認識子
iex(4)> self()
#PID<0.352.0>

次に同じ処理を100個のプロセスを生成して実行させてみる
パイプ演算子を使って一気にawaitまでを実行する

iex(1)> res = Enum.map(0..100, fn num -> Task.async(fn -> num + 2 end) end) |> Enum.map(fn p -> Task.await(p) end)
[2, 3, 4, 5, 6, 7, 8, 9, 10,
 11, 12, 13, 14, 15, 16, 17,
 18, 19, 20, 21, 22, 23, 24,
 25, 26, 27, 28, 29, 30, 31,
 32, 33, 34, 35, 36, 37, 38,
 39, 40, 41, 42, 43, 44, 45,
 46, 47, 48, 49, 50, 51, ...]

# おまけ
iex(2)> Enum.sum(res)
5252

iex(3)> accurate = Enum.sum(2..102)
5252

iex(4)> Enum.sum(res) == accurate
true

これで基礎はバッチリ抑えた。次にAPIを並行処理でcallしてみる

mixプロジェクトの立ち上げと関数の用意

callするAPIはおなじみのジブリAPI
httpクライアントの外部ライブラリであるpoisonを使用したいためmixのプロジェクトを立ち上げる

$ mix new ghibli_app
$ cd ghibli_app

次に必要な外部ライブラリをmix.exsのdepsに記述する

defp deps do
    [
        {:httpoison, "~> 1.5"}
    ]
  end

もし使用しているElixirのversionが1.3もしくはそれ以下の場合にはmix.exsのapplicationに以下を追加

def application do
  [applications: [:httpoison]]
end

準備が整ったので、外部ファイルをダウンロードする

$ mix deps.get
Resolving Hex dependencies...
Dependency resolution completed:
New:
  certifi 2.5.1
  hackney 1.15.1
  httpoison 1.5.1
  idna 6.0.0
  metrics 1.0.1
  mimerl 1.2.0
  parse_trans 3.3.0
  ssl_verify_fun 1.1.4
  unicode_util_compat 0.4.1
* Getting httpoison (Hex package)
:
:
* Getting parse_trans (Hex package)

無事にダウンロードされたようだ。この時点でmix deps.compileというコマンドを使用して自身でcompileを行なってもいいが、次回のiexプロンプトの立ち上げ時に自動でcompileが実行されるため、行わなくても良い

一度、iexプロンプトを起動して./lib/ghibli_app.ex に標準で記述されているhello関数を実行してみる

$ iex -S mix
Erlang/OTP 21 [erts-10.2.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] [dtrace]

===> Compiling parse_trans
===> Compiling mimerl
:
:
Generated test app

自動でコンパイルが実行される
さっそくhello関数を呼び出してみよう

iex(1)> GhibliApp.hello
:world

上手く動作されることが確認できたので./lib/ghibli_app.exにAPIをcallする関数と並行処理を行う関数を記述していく
今回使用するAPIのエンドポイントは以下だ。GETメゾットで脳死で使用できるものを選択した

https://ghibliapi.herokuapp.com/films

defmodule GhibliApp do
 @moduledoc """
 Documentation for GhibliApp.
 """
 @doc """
 Hello world.
 ## Examples
     iex> GhibliApp.hello()
     :world
 """

 # APIをcallする関数
 def call_ghibli_api do
   HTTPoison.get!("https://ghibliapi.herokuapp.com/films")
 end

 # 受け取った指定数のプロセスを実行してAPIをcallする
 def parallel_exec(total_process) do
   Enum.map(1..total_process, fn _num ->
     Task.async(fn -> call_ghibli_api() end)
   end)
   |> Enum.map(fn p -> Task.await(p) end)
   |> Enum.map(fn data -> data.status_code end) # 戻り値の構造体からstatus_codeを抽出(データ量が多く可読性のため追加)
 end
end

ファイルに編集を行なったので立ち上がっているiexプロンプトに再度compileをするようにコマンドを記述する

iex(2)> recompile
Compiling 1 file (.ex)
:ok

続いて、メインの処理(parallel_exec)を実行するが、大量のプロセスを生成して並行でcallするとサーバーに迷惑となるので常識ある数にする
今回は適当にジョジョのアニメ5部完結記念に4を選択した

iex(3)> GhibliApp.parallel_exec(4)
[200, 200, 200, 200]

いいね
非同期処理でDBにinsertしたり、複数ファイルをstreamで読み込んで編集したりと使い方を考えるだけでワクワクする

次回の勉強会について

特に希望がなければOTPのbehaviorの1つであるGenServerを使って、stateを持つサーバーを実装してみようと思う
皆さん、お気づきかと思うが勉強会のテーマにしているものはこのブログで自身が学んだものをアウトプットするような形で選択している

開催日は9月20日(金) 19:30 ~ 21:30を予定している

参考文献