/var/log/messages

Apr 24, 2019 - 2 minute read - Comments - programming

Elixir, Flow

何となく elixir とか flow とかいうキーワードが耳に入ってきまして Google 先生から教えて頂いたドキュメントが以下。

とりあえず

以下の順で手続きが組み立てられていってます。

  • Read file in stream
  • Parse JSON string to Map data line by line
  • Get only the first name from the Map
  • Count how many times the same name appered
  • Sort in descending order of the count

パイプで作れる、というのが面白いですよね。自分メモを以下に。

とりあえずファイルを読む。

def map_reduce(file) do
  File.stream!(file)

これをパイプで渡すので Enum.map で云々。

def map_reduce(file) do
  File.stream!(file)
  |> Enum.map(&json_parse/1)

オブジェクト単位でパタンマッチして first: な値を取得なのか。

def map_reduce(file) do
  File.stream!(file)
  |> Enum.map(&json_parse/1)
  |> Enum.map(fn person ->
    %{name: %{first: first}} = person
    first
  end)

パタンマッチ、良いですよね。そしてさらにパイプで渡して accumulate なのかどうか。

def map_reduce(file) do
  File.stream!(file)
  |> Enum.map(&json_parse/1)
  |> Enum.map(fn person ->
    %{name: %{first: first}} = person
    first
  end)
  |> Enum.reduce(%{}, fn name, acc ->
    Map.update(acc, name, 1, & &1 + 1)
  end)

う、Map.update 忘れてるし。ここ、別途確認します。

もう少し

  • Eager (元の版)
  • Lazy
  • Flow

で特徴を確認しています。ちなみに Lazy では Enum.mapStream.map で置換しています。Enum は一度全部を処理するのか。対して Stream は名前の通りのアレですね。

ただし、両方とも single thread で動作、とのこと。

Flow

書き換えたら以下、とのこと。

def map_reduce(:flow, file) do    
  File.stream!(file)
  |> Flow.from_enumerable # added
  |> Flow.map(&json_parse/1) # Enum ==> Flow
  |> Flow.map(fn person -> # Enum ==> Flow
    %{name: %{first: first}} = person 
    first
  end)
  |> Flow.partition # added
  |> Flow.reduce(fn -> %{} end, fn name, acc -> # Enum ==> Flow
    Map.update(acc, name, 1, & &1 + 1)
  end)      
  |> Enum.sort(fn {_, count1}, {_, count2} -> count1 >= count2 end)
end

File.stream の後からは並列で動作、とのこと。

宿題

  • Map.update の確認
  • Flow の諸々確認

稼動に余裕があれば上記確認の方向にて。あと、Flow の並列性についても確認してみたいです。

phoenix excersize (15) libcluster

comments powered by Disqus