やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

【サンプルコード有り】ElixirでTaskを使った簡単な並行処理の実装方法

Taskとは

Elixirでプロセスを作成する方法はいろいろあります
以前はelixirで並列処理を使ってファイルを同時に開き特定の文字を検索する
並列処理をやりましたが、その時は

spawn(module, :func, [argument)

こんな感じでspawnメゾットを使ってプロセスを生成した

defmodule Sample do
  def hello do
    IO.puts("hello world")
  end
end

pid = spawn(Sample, :hello, [])
#なにかしらのプロセス値が表示される
iex> pid
#PID<0.1873.0>

#現在のプロセス値
iex> self()
#PID<0.196.0>

こんな感じでプロセスが生成されているのが分かる

ただ毎度、シンプルな関数のためにプロセスを生成してメッセージを送信して受信して...
なんてのは単純にしんどいしコード量も増えて疲れる(ゲッソリ

そこでElixirにはTaskというプロセスの生成が可能な
クソ便利なものが用意されているわけです
シンプルな関数を使用した並行処理がめちゃくちゃ簡単に書くことができる

利用のしやすさ

spawnとmessage << Task

Taskの使い方

まずはTaskに投げる用のサンプルモジュールを作成しておく
ただのサンプルなのでアホほどシンプルに。メゾットは引数同士を足し合わせるadd()のみ

defmodule Sample do
  def add(x, y) do
    x + y
  end
end

ではさっそくTaskを使ってプロセスを生成してみる
Task__Task.start()__を使用する
引数についてはspawnと全く同じだと思ってください

res = Task.start(Sample, :add, [1,2])
iex> res
{:ok, #PID<0.1929.0>}

先ほどのspawnメゾットと異なり戻り値はTask構造体になっている
プロセスの生成に成功した場合にのみ{:ok, pid}という形式で値が返ってくる

しかし、今の所Task.start()を使って並列処理をしたことはありません
もっと便利な__Task.async()__というメゾットがあります

task = Task.async(Sample, :add, [1,2])
%Task{
  owner: #PID<0.196.0>,
  pid: #PID<0.1956.0>,
  ref: #Reference<0.838616553.1924923393.12356>
}

Task.async()でプロセスの生成に成功するとTask構造体に含まれた以下の値が返ってくる

  • 現在のプロセスpid
  • 生成されたプロセスpid
  • よくわからんやつ -> spawn_linkで生成される値

この時点ではバックラウンドで処理が走っただけで結果の値は取得できていない
結果の値を取得するためにはTask.await()を使う

Task.await(task)
3

この時点で生成したプロセスは役目を終えてさよならしてるので
もう一度、Task.await(task)をcallするとerrorになる

Task.await(task)
** (exit) exited in: Task.await(%Task{owner: #PID<0.196.0>, pid: #PID<0.1989.0>, ref: #Reference<0.838616553.1924923393.12399>}, 5000)
    ** (EXIT) time out
    (elixir) lib/task.ex:577: Task.await/2

もしくはTask.yield(task, timeout_second)というメゾットを使うこともできる
第2引数に渡したsecond分の時間が経過してもタスクが終了しない場合にはnilを自動的に返してくれる

#再度プロセスを生成する
task = Task.async(Sample, :add, [1,2])

iex> Task.yield(task, 1000)
{:ok, 3}

#もうプロセスは消滅しているがyieldだとerrorにならない
#3秒だけ様子見。見つからなかったようだ
iex(55)> Task.yield(task, 3000)
nil

もう少し応用的な使い方

基礎の知識が身についたところでもう少し複雑な処理を行ってみる
まずは複数個の列挙可能データ(今回はレンジ)を用意して、1つのリストに格納する(リストinレンジ)
サイズは適当でok

data = [
  1..100,
  1..101,
  1..101,
  :
]

このdata内の各レンジに対してそれぞれプロセスを生成して
各要素を2倍した後に合計した物をreturnするという処理をTaskで行ってみる

まずは元となるモジュールと関数を用意

defmodule TaskPractice do
  def lst_adjudtment(lst) do
    lst
      |> Enum.map(&(&1 * 2))
      |> Enum.sum()
  end
end

この部分に関しては今更なので特に触れることはないかと
次に良しなにリストinレンジを用意する

iex> lists = Enum.reduce(1..10, [], fn x, acc -> acc ++ [1..x+100] end)
[1..101, 1..102, 1..103, 1..104, 1..105, 1..106,
 1..107, 1..108, 1..109, 1..110]

あとはいつものようにEnum.map()を使えば上手く出来そう
リストinタスクたるものが出来上がる

#各要素のレンジをTask.asyncの第3引数に投げる(要素の数だけプロセスが生成される)
iex> tasks = Enum.map(lists, &(Task.async(TaskPractice, :lst_adjudtment, [&1])))
[
  %Task{
    owner: #PID<0.200.0>,
    pid: #PID<0.216.0>,
    ref: #Reference<0.640011924.2942042115.104787>
  },
  %Task{
    owner: #PID<0.200.0>,
    pid: #PID<0.217.0>,
    ref: #Reference<0.640011924.2942042115.104788>
 },
  :
  :
  %Task{
    owner: #PID<0.200.0>,
    pid: #PID<0.225.0>,
    ref: #Reference<0.640011924.2942042115.104796>
  }
]

あとはTask.awaitを使って値を取得すればok
ついでにパイプ使ってクールにまとめておく

iex> res = Enum.reduce(1..10, [], fn x, acc -> acc ++ [1..x+100] end)
          |> Enum.map(&(Task.async(TaskPractice, :lst_adjudtment, [&1])))
          |> Enum.map(&(Task.await(&1)))

[10302, 10506, 10712, 10920, 11130, 11342, 11556, 11772, 11990, 12210]

上手く行きましたね
以前はあれほどspawnで苦労した並列処理が

  • Task.asyncでモジュールと関数と引数指定
  • Task.awaitで値取得

のたった2ステップで完結しました。やったね
ちなみにですが、Task.async()には関数だけを投げることも可能です

iex> task = Task.async(fn -> 3+3 end) 
%Task{
  owner: #PID<0.200.0>,
  pid: #PID<0.239.0>,
  ref: #Reference<0.640011924.2942042113.105978>
}

iex> Task.await(task)
6

おまけのコーナー

いつぞやにつくった公開APIをcallするモジュールのメゾットを並列処理で動かしてみる

defmodule CallApi do
  def fetch_ghibli_films() do
    HTTPoison.get!("https://ghibliapi.herokuapp.com/films").body
      |> Poison.Parser.parse!()
      |> Enum.filter(&(&1["director"] == "Hayao Miyazaki"))
      |> Enum.map(&(&1["title"]))
  end
end
try_total = 5 
res = 1..try_total
  |> Enum.map(fn _ -> Task.async(CallApi, :fetch_ghibli_films, []) end)
  |> Enum.map(&(Task.await(&1)))

res
[
  ["Castle in the Sky", "My Neighbor Totoro",
   "Kiki's Delivery Service", "Porco Rosso",
   "Princess Mononoke", "Spirited Away",
   "Howl's Moving Castle", "Ponyo",
   "The Wind Rises"],
  ["Castle in the Sky", "My Neighbor Totoro",
   "Kiki's Delivery Service", "Porco Rosso",
   "Princess Mononoke", "Spirited Away",
   "Howl's Moving Castle", "Ponyo",
   "The Wind Rises"],
  :
  :
  ["Castle in the Sky", "My Neighbor Totoro",
   "Kiki's Delivery Service", "Porco Rosso",
   "Princess Mononoke", "Spirited Away",
   "Howl's Moving Castle", "Ponyo",
   "The Wind Rises"]
]

秒で取得完了。えぐい