何となく elixir とか flow とかいうキーワードが耳に入ってきまして Google 先生から教えて頂いたドキュメントが以下。
とりあえず
以下の順で手続きが組み立てられていってます。
- Read file in stream
- Parse JSON string to Map data line by line
- Get only the first name from the Map
- Count how many times the same name appered
- Sort in descending order of the count
パイプで作れる、というのが面白いですよね。自分メモを以下に。
とりあえずファイルを読む。
def map_reduce(file) do
File.stream!(file)
これをパイプで渡すので Enum.map
で云々。
def map_reduce(file) do
File.stream!(file)
|> Enum.map(&json_parse/1)
オブジェクト単位でパタンマッチして first:
な値を取得なのか。
def map_reduce(file) do
File.stream!(file)
|> Enum.map(&json_parse/1)
|> Enum.map(fn person ->
%{name: %{first: first}} = person
first
end)
パタンマッチ、良いですよね。そしてさらにパイプで渡して accumulate なのかどうか。
def map_reduce(file) do
File.stream!(file)
|> Enum.map(&json_parse/1)
|> Enum.map(fn person ->
%{name: %{first: first}} = person
first
end)
|> Enum.reduce(%{}, fn name, acc ->
Map.update(acc, name, 1, & &1 + 1)
end)
う、Map.update
忘れてるし。ここ、別途確認します。
もう少し
- Eager (元の版)
- Lazy
- Flow
で特徴を確認しています。ちなみに Lazy では Enum.map
を Stream.map
で置換しています。Enum は一度全部を処理するのか。対して Stream は名前の通りのアレですね。
ただし、両方とも single thread で動作、とのこと。
Flow
書き換えたら以下、とのこと。
def map_reduce(:flow, file) do
File.stream!(file)
|> Flow.from_enumerable # added
|> Flow.map(&json_parse/1) # Enum ==> Flow
|> Flow.map(fn person -> # Enum ==> Flow
%{name: %{first: first}} = person
first
end)
|> Flow.partition # added
|> Flow.reduce(fn -> %{} end, fn name, acc -> # Enum ==> Flow
Map.update(acc, name, 1, & &1 + 1)
end)
|> Enum.sort(fn {_, count1}, {_, count2} -> count1 >= count2 end)
end
File.stream
の後からは並列で動作、とのこと。
宿題
- Map.update の確認
- Flow の諸々確認
稼動に余裕があれば上記確認の方向にて。あと、Flow の並列性についても確認してみたいです。