やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

【実装コード有り】Elixirでpandasっぽいcsvファイルを触るモジュールを作ってみた

ゆるふわな動機

pythonのライブラリにpandasと呼ばれるものがある
厳密にはC言語で書かれているのでpythonかというとそうでもないが
このpandasが提供しているAPIにread_csvというものがあり
csv形式のファイルをdf形式で読み込みpandasでお気軽にゴリゴリと編集することが可能
(shif-jisで読み込んでいるのは日本語を含むcsvのため。defaultはutf-8だったはず)

df = pd.read_csv("sample_data.csv", encoding = "shift-jis")
print(type(df))

#pandas.core.frame.DataFrame

このdfに対して

  • この条件で~
  • この条件で~
  • こういうカラムを追加して~
  • csvに保存

なんてことをよくやる
テキトーにコードを妄想するとこんな感じになる

df = pd.read_csv("sample_data.csv", encoding = "shift-jis")
df = df[df["code"] == "00"]
df = df.query('year > 2010')
df.to_csv("sample_new.data.csv", header=True, index=False)

もしくはdf.assignを使ってゴリゴリやる方法もある
こちらのqiitaの記事が素晴らしいです
https://qiita.com/piroyoung/items/dd209801ca60a0b00c11qiita.com

で、結局何が言いたいの?って話なんですけど、このデータ処理の流れってまさに
Elixirのパイプ演算じゃんってことを思う訳ですね

csv
|> Aカラムを削除
|> Bカラムに空値があるレコードを除去
|> Bカラムの100以下の数値を持つレコードを除去
|> csv形式で.csvファイルを出力

あ、いい。ここに並列処理が加わると...あとは分かるね?

とりあえず関数作った

全体コードはこちら。レポジトリ名は気分でつけました。
github.com

一番良く行う操作 ==> 特定条件での抽出 なのでこの操作を完結できる関数を用意してみた
csvライブラリを使って.csvファイルを読み込んでいる
読み込んだ時点ではデータはStream形式になっているのでStreamを可能な限り保って値を変換するようにしている
課題は並列処理の実装をしていないこと

パターンマッチを使って3種類の条件渡しに対応した

  • 条件(bool値を返す)を関数を渡すパターン(is_function)
  • 複数の値(リスト)からOR検索を行うパターン(is_list)
  • 完全一致する値を渡すパターン(上記以外)

記述は可能な限り、一般化したつもりだが
補助関数に無名関数を渡してて若干可読性が悪いような気もする
すっきりしててElixirっぽくて良いかなと思ってますが
インデントについてはオレ流ですいません

defmodule CsvEditer do
  require CsvColumn
  
  # 関数を第3引数に受け取った場合
  def filter(cdl, column_name, func) when is_function(func) do
    # 引数で受け取った関数を実行する無名関数を作成
    stream_func = fn enum, index, ope_func -> Stream.filter(enum, &(ope_func.(Enum.at(&1, index)))) end
    _filter_helper(cdl, column_name, stream_func, func)
  end

  # 複数の値(リスト)を第3引数に受け取った場合
  def filter(cdl, column_name, val) when is_list(val) do
    # 引数で受け取ったリストの値を持つかどうかを確認する無名関数を作成
    stream_func = fn enum, index, val -> Stream.filter(enum, &(Enum.member?(val, Enum.at(&1, index)))) end
    _filter_helper(cdl, column_name, stream_func, val)
  end

  # 単純に値を第3引数に受け取った場合
  def filter(cdl, column_name, val) do
    # 引数で受け取った値と一致するかどうかを確認する無名関数を作成
    stream_func = fn enum, index, val -> Stream.filter(enum, &(Enum.at(&1, index) == val)) end
    _filter_helper(cdl, column_name, stream_func, val)
  end

  # 一般化された補助関数
  defp _filter_helper(cdl, column_name, stream_func, val) do
    # 受け取ったカラムがリストのどの位置(index)にあるかを検索
    index_num = CsvColumn.val_index_in_header(cdl, column_name)
    body =
      cdl
      |> Stream.drop(1) #headerを一時除去
      |> stream_func.(index_num, val) #受け取った関数を実行
    Stream.concat(Stream.take(cdl, 1), body) #headerをつけて返す
  end
end

CsvColumnモジュール については特に難しいことはしていない

defmodule CsvColumn
  def cdl_header_stream(cdl, false) do
    # header(リストinリストのhead)をリストで返す
    cdl
    |> Stream.take(1)
    |> Enum.to_list()
    |> List.first()
  end
  # Streamのまま返す
  def cdl_header_stream(cdl, _), do: cdl |> Stream.take(1)

  # header(リスト)から引数の値を検索してindex番号を返す
  def val_index_in_header(cdl, column_name) do
    cdl
    |> cdl_header_stream(false)
    |> Enum.find_index(fn val -> val == column_name end)
  end
end

今回、検証に使用したのはこちら
よくサンプルとして広く扱われるiris(ゆり)のデータです

紹介するのを忘れていましたが、こちらがcsvを読み込む関数

defmodule CsvReader do
  def read_csv(file_path) do
    case File.exists?(file_path) do
      true ->
        file_stream =
          File.stream!(file_path)
          |> CSV.decode
          |> Stream.map(fn row ->
            {_, value_lst} = row
            value_lst
          end)
        {:ok, file_stream}
      false ->
        {:error, "#{file_path}: no such file or directory"}
    end
  end
end

以下を目標にデータをゴリゴリする

  • varietyはSetosaのみを抽出
  • sepal.lengthが6.0以下のみを抽出
  • sepal.widthが3.4もしくは3.5の値のみを抽出

では、さっそく
(ちなみにcdlというのは「csv data list」から命名している)

$ iex -S mix

#成功すればstatusとstreamの値が帰ってくる
iex> {_status, cdl} = CsvReader.read_csv("iris.csv")
{:ok,
 #Stream<[
   enum: #Function<63.126435914/2 in Stream.transform/3>,
   funs: [#Function<49.126435914/1 in Stream.map/2>,
    #Function<49.126435914/1 in Stream.map/2>]
 ]>}

# 一回データの中身を見てみる(リストinリスト)
iex> cdl |> Enum.to_list
[
  ["sepal.length", "sepal.width", "petal.length", "petal.width", "variety"],
  ["5.1", "3.5", "1.4", ".2", "Setosa"],
  ["4.9", "3", "1.4", ".2", "Setosa"],
  ["4.7", "3.2", "1.3", ".2", "Setosa"],
  :
  :
  ["4.6", "3.1", "1.5", ".2", "Setosa"]
]

数値(float)への変換はブログ記事の簡略化のため、裏作業で行なった

[
  ["sepal.length", "sepal.width", "petal.length", "petal.width", "variety"],
  [5.1, 3.5, 1.4, 0.2, "Setosa"],
  [4.9, 3.0, 1.4, 0.2, "Setosa"],
  [4.7, 3.2, 1.3, 0.2, "Setosa"]
]

準備完了。あとはパイプで遊ぶだけ

iex> csv |> 
        CsvEditer.filter("variety", "Setosa") |>
        CsvEditer.filter("sepal.length", fn x -> x <= 6.0 end) |>
        CsvEditer.filter("sepal.width", [3.4, 3.5]) |>
        Enum.to_list
[
  ["sepal.length", "sepal.width", "petal.length", "petal.width", "variety"],
  [5.1, 3.5, 1.4, 0.2, "Setosa"],
  [4.6, 3.4, 1.4, 0.3, "Setosa"],
  [5.0, 3.4, 1.5, 0.2, "Setosa"],
  [4.8, 3.4, 1.6, 0.2, "Setosa"],
  [5.1, 3.5, 1.4, 0.3, "Setosa"],
  [5.4, 3.4, 1.7, 0.2, "Setosa"],
  [4.8, 3.4, 1.9, 0.2, "Setosa"],
  [5.0, 3.4, 1.6, 0.4, "Setosa"],
  [5.2, 3.5, 1.5, 0.2, "Setosa"],
  [5.2, 3.4, 1.4, 0.2, "Setosa"],
  [5.4, 3.4, 1.5, 0.4, "Setosa"],
  [5.5, 3.5, 1.3, 0.2, "Setosa"],
  [5.1, 3.4, 1.5, 0.2, "Setosa"],
  [5.0, 3.5, 1.3, 0.3, "Setosa"],
  [5.0, 3.5, 1.6, 0.6, "Setosa"]
]

おぉ...ええやんけ
パイプ演算子のおかげで何がしたいのかが一発で分かる
問題は速度。記事の文字数が多くなったため、別記事で検証してみようと思う

ただ、可読性はあきらかに上がった(個人的にね

レッツElixir