Taskとは
Elixir
でプロセスを作成する方法はいろいろあります
以前はelixirで並列処理を使ってファイルを同時に開き特定の文字を検索するで
並列処理をやりましたが、その時は
spawn(module, :func, [argument)
こんな感じでspawn
メゾットを使ってプロセスを生成した
defmodule Sample do def hello do IO.puts("hello world") end end pid = spawn(Sample, :hello, []) #なにかしらのプロセス値が表示される iex> pid #PID<0.1873.0> #現在のプロセス値 iex> self() #PID<0.196.0>
こんな感じでプロセスが生成されているのが分かる
ただ毎度、シンプルな関数のためにプロセスを生成してメッセージを送信して受信して...
なんてのは単純にしんどいしコード量も増えて疲れる(ゲッソリ
そこでElixir
にはTask
というプロセスの生成が可能な
クソ便利なものが用意されているわけです
シンプルな関数を使用した並行処理がめちゃくちゃ簡単に書くことができる
利用のしやすさ
spawnとmessage << Task
Taskの使い方
まずはTask
に投げる用のサンプルモジュールを作成しておく
ただのサンプルなのでアホほどシンプルに。メゾットは引数同士を足し合わせるadd()
のみ
defmodule Sample do def add(x, y) do x + y end end
ではさっそくTask
を使ってプロセスを生成してみる
Task
の__Task.start()__
を使用する
引数についてはspawn
と全く同じだと思ってください
res = Task.start(Sample, :add, [1,2]) iex> res {:ok, #PID<0.1929.0>}
先ほどのspawn
メゾットと異なり戻り値はTask構造体
になっている
プロセスの生成に成功した場合にのみ{:ok, pid}
という形式で値が返ってくる
しかし、今の所Task.start()
を使って並列処理をしたことはありません
もっと便利な__Task.async()__
というメゾットがあります
task = Task.async(Sample, :add, [1,2]) %Task{ owner: #PID<0.196.0>, pid: #PID<0.1956.0>, ref: #Reference<0.838616553.1924923393.12356> }
Task.async()
でプロセスの生成に成功するとTask構造体
に含まれた以下の値が返ってくる
- 現在のプロセスpid
- 生成されたプロセスpid
- よくわからんやつ ->
spawn_link
で生成される値
この時点ではバックラウンドで処理が走っただけで結果の値は取得できていない
結果の値を取得するためにはTask.await()
を使う
Task.await(task) 3
この時点で生成したプロセスは役目を終えてさよならしてるので
もう一度、Task.await(task)
をcallするとerrorになる
Task.await(task) ** (exit) exited in: Task.await(%Task{owner: #PID<0.196.0>, pid: #PID<0.1989.0>, ref: #Reference<0.838616553.1924923393.12399>}, 5000) ** (EXIT) time out (elixir) lib/task.ex:577: Task.await/2
もしくはTask.yield(task, timeout_second)
というメゾットを使うこともできる
第2引数に渡したsecond分の時間が経過してもタスクが終了しない場合にはnil
を自動的に返してくれる
#再度プロセスを生成する task = Task.async(Sample, :add, [1,2]) iex> Task.yield(task, 1000) {:ok, 3} #もうプロセスは消滅しているがyieldだとerrorにならない #3秒だけ様子見。見つからなかったようだ iex(55)> Task.yield(task, 3000) nil
もう少し応用的な使い方
基礎の知識が身についたところでもう少し複雑な処理を行ってみる
まずは複数個の列挙可能データ(今回はレンジ)を用意して、1つのリストに格納する(リストinレンジ)
サイズは適当でok
data = [ 1..100, 1..101, 1..101, : ]
このdata内の各レンジに対してそれぞれプロセスを生成して
各要素を2倍した後に合計した物をreturnするという処理をTask
で行ってみる
まずは元となるモジュールと関数を用意
defmodule TaskPractice do def lst_adjudtment(lst) do lst |> Enum.map(&(&1 * 2)) |> Enum.sum() end end
この部分に関しては今更なので特に触れることはないかと
次に良しなにリストinレンジを用意する
iex> lists = Enum.reduce(1..10, [], fn x, acc -> acc ++ [1..x+100] end) [1..101, 1..102, 1..103, 1..104, 1..105, 1..106, 1..107, 1..108, 1..109, 1..110]
あとはいつものようにEnum.map()
を使えば上手く出来そう
リストinタスクたるものが出来上がる
#各要素のレンジをTask.asyncの第3引数に投げる(要素の数だけプロセスが生成される) iex> tasks = Enum.map(lists, &(Task.async(TaskPractice, :lst_adjudtment, [&1]))) [ %Task{ owner: #PID<0.200.0>, pid: #PID<0.216.0>, ref: #Reference<0.640011924.2942042115.104787> }, %Task{ owner: #PID<0.200.0>, pid: #PID<0.217.0>, ref: #Reference<0.640011924.2942042115.104788> }, : : %Task{ owner: #PID<0.200.0>, pid: #PID<0.225.0>, ref: #Reference<0.640011924.2942042115.104796> } ]
あとはTask.await
を使って値を取得すればok
ついでにパイプ使ってクールにまとめておく
iex> res = Enum.reduce(1..10, [], fn x, acc -> acc ++ [1..x+100] end) |> Enum.map(&(Task.async(TaskPractice, :lst_adjudtment, [&1]))) |> Enum.map(&(Task.await(&1))) [10302, 10506, 10712, 10920, 11130, 11342, 11556, 11772, 11990, 12210]
上手く行きましたね
以前はあれほどspawnで苦労した並列処理が
- Task.asyncでモジュールと関数と引数指定
- Task.awaitで値取得
のたった2ステップで完結しました。やったね
ちなみにですが、Task.async()
には関数だけを投げることも可能です
iex> task = Task.async(fn -> 3+3 end) %Task{ owner: #PID<0.200.0>, pid: #PID<0.239.0>, ref: #Reference<0.640011924.2942042113.105978> } iex> Task.await(task) 6
おまけのコーナー
いつぞやにつくった公開APIをcallするモジュールのメゾットを並列処理で動かしてみる
defmodule CallApi do def fetch_ghibli_films() do HTTPoison.get!("https://ghibliapi.herokuapp.com/films").body |> Poison.Parser.parse!() |> Enum.filter(&(&1["director"] == "Hayao Miyazaki")) |> Enum.map(&(&1["title"])) end end
try_total = 5 res = 1..try_total |> Enum.map(fn _ -> Task.async(CallApi, :fetch_ghibli_films, []) end) |> Enum.map(&(Task.await(&1))) res [ ["Castle in the Sky", "My Neighbor Totoro", "Kiki's Delivery Service", "Porco Rosso", "Princess Mononoke", "Spirited Away", "Howl's Moving Castle", "Ponyo", "The Wind Rises"], ["Castle in the Sky", "My Neighbor Totoro", "Kiki's Delivery Service", "Porco Rosso", "Princess Mononoke", "Spirited Away", "Howl's Moving Castle", "Ponyo", "The Wind Rises"], : : ["Castle in the Sky", "My Neighbor Totoro", "Kiki's Delivery Service", "Porco Rosso", "Princess Mononoke", "Spirited Away", "Howl's Moving Castle", "Ponyo", "The Wind Rises"] ]
秒で取得完了。えぐい