やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

elixirで並列処理を使ってファイルを同時に開き特定の文字を検索する

あいかわらず長いタイトル

どういうことかというと

./file/file1.txt
./file/file2.txt
:
./file/file99.txt

file(n).txtには単純に文字が書いてあるだけです
こんな感じですかね

ppppppcatpppppppppppppp
ppppppcatpppppppppppppp
ppppppcatpppppppppppppp
ppppppcatpppppppppppppp

対象のディレクトリ内のファイルを全てを並列処理で各プロセスで開き
特定の文字列を開いたファイルから検索し、何個含まれているかをカウントする
このファイルの場合であれば「cat」を探すとして結果としては「4」が返ってくる
これを並列処理でゴリゴリっとして複数ファイルに対してカウントを行い

[4, 5, 20, 45, 2]

ってことがしたい
この知見があればcsvファイルを並列で読み込んでゴリゴリ加工していくなんてことが気軽にできるはず...
elixirの強さが生きてきますね

プロセスを生成する

正直なところ、他の言語で並列処理をほとんど触ったことがなくて(pythonで少しだけ)
すんませんが並列処理に関する知識・知見はほぼゼロです
プロセスやらスレッドやらを生成してゴリゴリということは何となく分かってます
elixirではerlangでサポートされているプロセスを使ってるようで

  • 軽い
  • 速い
  • 負荷が少ない
  • すべてのCPUで使用可能

とのことで強すぎるわ
また、elixirではプロセスを作るのがアホほど簡単でデフォルトのリミットを外してさえあげれば
低スペックなPCでも10万とか100万って数のプロセスを余裕で生成できます

プロセスの生成の方法は色々ありますが最もシンプルな

spawn(module, function, [arguments])

に触れておきます
spawn使うとプロセスを生成してくれます
はい。これだけです

サンプルとして引数にはこんな感じで渡す

defmodule Sample do
  def greet(msg) do
    IO.puts(msg)
  end
end

#モジュール名と関数名(アトムで渡す)
#関数に渡したい引数は第3引数に配列に内包して渡す
spawn(Sample, :greet, ["ossu"])

#result:
#ossu
#PID<0.42.0>

spawnはPIDという値をリターンします
ずっとProcessIDの略語だと思ってたんですけど
elixirの公式ページによると「Process Identifier(プロセス認識子)」だそうです

sendとreceive

elixirの並列処理ではアクターモデルたるものを採用しています
アクターモデルってのをクソ程ざっくり説明すると

アクターモデル
それぞれのプロセス同士で何も共有しない
お互いにメッセージ送り合って色々やるで~

これはいいですね。
今あのプロセスはこの状態で...あっちのプロセスはこういう状態で...
ってなことを一切考える必要がありません
お互いのプロセスでメッセージを送受信するだけでいいので

しかもプロセス同士でメッセージ送受信するのもクソ簡単です
メッセージの送信

pid = spawn(Sample, :greet, ["オラオラオラオラ"])
send pid, {self(), "send message!"}

spawn関数で生成したPIDを使用して
{送信元, メッセージ}を用意してあげればok
またself()もしくはself(elixir1.8ではwarning)を使うことで現在のプロセスのPIDを返してくれます
AプロセスからBプロセスへメッセージを送信ってことをしているわけです

メッセージの受信

defmodule Sample do
  def greet do
    receive do
      {sender, message} -> IO.puts(message)
    end
  end
end

spawnの中で指定したモジュールの関数内にreceiveを使う
さらにreceiveブロックの中でパターンマッチを行いメッセージを受信する

送信元にメッセージを送り返す

defmodule Sample do
  def greet do
    receive do
      {sender, message} ->
        send sender, {:catch, "catch: #{message}"}
    end
  end
end
 
pid = spawn(Sample, :greet, [])
send pid, {self(), "send message!"}

receive do
  {:catch, message} -> IO.puts(message)
end

モジュール内の関数でrecieveした後にsendを再び使って送信元に再びメッセージを送り返します
さらにこのモジュール内の関数からsendで送信したメッセージを送信元で受信することができます

特定の文字をカウントする関数

ここはメインの部分ではないのでざっくりと
再帰関数を使ってファイル内の文字列を置き換える毎にaccumlatorに+1して
置き換えられなくなった時点(false)で終了し、最終的なaccumlatorをリターンします

defmodule FinderStrInFile do
  def total_count(target_str, file_path) do
    File.read!(file_path)
      |> _total_count(target_str, 0)
  end

  defp _total_count(file_str, target_str, accum) do
    key = String.contains?(file_str, target_str)
    case key do
      true ->
        String.replace(file_str, target_str, "", global: false)
          |> _total_count(target_str, accum+1)
      false -> accum
    end
  end
end

FinderStrInFileモジュールでのメッセージの送受信

プログラミングelixirの第14章をリスペクトしながら送受信の部分を作成していきます
送信されたメッセージをやりとりするためにモジュール内にfind関数を用意します

def find(scheduler) do
    #いつでもいけます
    send scheduler, { :ready, self() }
    receive do
      { :find, file_path, target_str, client } ->
        IO.puts("--> #{file_path}")
        send client, { :answer, total_count(target_str, file_path), self() }
        #再帰呼び出し
        find(scheduler)
      { :shutdown } -> exit(:normal)
    end
  end

こん関数はcallされた時に送信元に{ :ready, self() }(アトム, 現在のPID)を送信します
こちら側はいつでもファイル探せる準備できてますよ〜ってことを知らせるためにメッセージを送るわけです

:findについて

{ :find, file_path, target_str, client } ->
  IO.puts("--> #{file_path}")
  send client, { :answer, total_count(target_str, file_path), self() }
  find(scheduler)

もちろんメッセージの受信部分が必要になるのでreceive節も用意しています
送信元から{ :find, ... }のデータを送ることでデータの内の文字のカウントを開始します
文字のカウントが終了した際には送信元に{ :answer, #結果, pid }を返し
処理が終了したことと、集計の結果を送信します
また、上記の処理が次回以降も行われるようにパターンマッチの中でfind自信をcallして再帰させています

:shutdownについて

{ :shutdown } -> exit(:normal)

もう処理する必要がなくなった際に処理を停止させるために{ :shutdown }を用意してあります
:findのパターンマッチではfind自信をcallして再帰していましたが
:shudownの方ではもう処理を継続させる必要がないのでプロセスの終了を行います

#引数でプロセスの終了の仕方のモードを変更可能
exit(:normal)

ようやく処理を行うメインのモジュールの全体が完成しました

defmodule FinderStrInFile do
  def find(scheduler) do
    #処理開始の準備ができたことを送信元に返信
    send scheduler, { :ready, self() }
    receive do
       
      { :find, file_path, target_str, client } ->
        IO.puts("--> #{file_path}")
        send client, { :answer, total_count(target_str, file_path), self() }
        find(scheduler)
      { :shutdown } -> exit(:normal)
    end
  end

  def total_count(target_str, file_path) do
    File.read!(file_path)
      |> _total_count(target_str, 0)
  end

  defp _total_count(file_str, target_str, accum) do
    key = String.contains?(file_str, target_str)
    case key do
      true ->
        String.replace(file_str, target_str, "", global: false)
          |> _total_count(target_str, accum+1)
      false -> accum
    end
  end
end

Schedulerの作成

この部分はプログラミングelixirの14章をほぼオマージュしています
プロセスの生成と対象モジュールにデータを送受信の管理部分を担当させます

Schedulerには2つの関数を用意します

  • run関数: プロセスを生成する
  • schedule_processes関数: メッセージの送受信の管理を行う

run関数はシンプルで受け取った数値の合計分、プロセスをEnum.mapを使って生成するだけです
spanwの戻り値であるPIDをschedule_processesの第1引数に渡して並列処理を開始させます
ついでに受け取った引数も渡します

def run(num_processes, module, func, file_list, target_str) do
    #受け取った数値分のレンジを生成
    (1..num_processes)
      #Enum.mapを使って数値分のプロセスを生成
      |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
      |> schedule_processes(file_list, target_str, [])
  end

schedule_processes関数は少し複雑ですが、別に難しいことはなんもやってません
あるのはreceive節のみでホンマにメッセージの送受信管理をしてるだけです
schedule_processesで管理しているメッセージは

  • :ready
  • :find
  • :shutdown
  • :answer

の4つです。すでにfind関数で触れているものなので各説明はしませんが
find関数から送信された:readyの合図を受信して処理を開始します
第2引数にファイルのパスがリストに格納されており、この第2引数に対して
ガード節を用いてパターンマッチさせています
リストから要素を取り出せる限り、schedule_processesを再帰させて:findを送信し続けます
空になった時点で:shutdownを送信して処理を終了するようにします

defmodule Scheduler do
  def schedule_processes(process, file_list, target_str, accum) do
    receive do
      #準備完了のメッセージを受信。リストが空でなければマッチ
      { :ready, pid } when file_list != [] ->
        #ヘッドとテイルを使って先頭の要素を取得
        [ head | tail ] = file_list
        #対象のモジュールに:findを送信
        send pid, { :find, head, target_str, self() }
        #再帰を行う
        schedule_processes(process, tail, target_str, accum)

      #準備完了のメッセージを受信。リストが空ならマッチ
      { :ready, pid } ->
        #対象のモジュールに終了を伝えるために:shutdownを送信
        send pid, { :shutdown }
          #ここが正直よくわかってない
          if length(process) > 1 do
            schedule_processes(List.delete(process, pid), file_list, target_str, accum) 
          else
            IO.inspect(accum)
          end
      #:answerを受け取りaccumlatorに結果を追加する
      { :answer, total_count, _pid } ->
        schedule_processes(process, file_list, target_str, [total_count | accum])
    end
  end
end

プログラミングelixir第14章を参考に書きましたが、この部分がまだよく分かってない...
Enum.mapとspawnで生成したPIDのリストを下記のようにしていくのは分かる
なぜなのか。並行して調べてるので何か分かり次第、追記します

[#PID<0.374.0>, #PID<0.375.0>, #PID<0.376.0>, #PID<0.377.0>]
[#PID<0.375.0>, #PID<0.376.0>, #PID<0.377.0>]
[#PID<0.375.0>, #PID<0.376.0>]
[#PID<0.375.0>]
#何の意味があるのかよく分からない。バグの対処とあるが....
if length(process) > 1 do
  schedule_processes(List.delete(process, pid), file_list, target_str, accum)
else
  IO.inspect(accum)
end

最後の最後に実行部分の作成

ファイルの数だけプロセスを生成してFinderStrInFileのfind関数を呼び出すようにします

#ファイルを読み込みパスを生成
files = File.ls!("./data")
  |> Enum.map(&("./data/" <> &1)) 

#ファイルの数分プロセスを生成するためにカウント
make_process_num = Enum.count(files)

#ファイルパスとファイル数。対象のモジュールと関数と引数を一気に渡す
result = Scheduler.run(make_process_num, FinderStrInFile, :find, files, "cat")

実行結果

IO.inspect(result)

#result:
#並列にIO.putsが実行されている!!
# --> ./data/str3.txt
# --> ./data/str2.txt
# --> ./data/str1.txt
# --> ./data/str5.txt
# --> ./data/str4.txt
# [20, 2, 5, 10, 20]
# [20, 2, 5, 10, 20]
# [Scheduler, FinderStrInFile]

無事にファイルの文字数がカウントされている!!!!!
# result = [20, 2, 5, 10, 20]

まとめ

とりあえず、やりたいことはできたと思う
TaskとAgentたるものを使用することでもっと楽にできるらしいです
アホほど長たらしくなってしまい、すんません
並列処理はelixirの核となる部分なのでもっと知見貯めたい
間違いなどあればコメントで指摘頂ければありがたいです