/var/log/messages

Apr 10, 2019 - 2 minute read - Comments - programming

Concurrency and Parallelism in Elixir

並行と並列と、という部分で参考になるドキュメントを発見。

機械翻訳に通してみたのですがそれ見つつで色々動作確認できるのかどうか。ざっくり控えを以下に。

整理

ええと

  • concurrency が並行
    • 複数のタスクを「進行中」にすることできるがどの時点でも CPU 上で一つのタスクしか実行されていない
  • parallelism が並列
    • 並列プログラムは複数の CPU コア上で複数のタスクを同時に実行できる

とのことで、このあたりはまだぱっと分からない鳥頭。

Elixir における並行、並列

  • Elixir はランタイムレベルで並行している
  • Elixir では並行コードと並列コードの間に区別はない
  • 並行コードを書くことだけで利用可能なコアが複数ある場合は VM が自動的にそれを並列化する

Task.async_stream/3

async_stream(enumerable, function, options \\ [])
  • enumerable の各項目で指定された関数を同時に実行するストリームを戻す
  • default では生成されたプロセスの数は列挙可能なアイテムの数と同じ

以下な実装が例示されています。これを並行処理に、とのこと。

def frequency(texts, _workers) do
  texts
  |> get_all_graphemes()
  |> count_letters()
end

defp get_all_graphemes(texts) do
  texts
  |> Enum.join()
  |> String.graphemes()
end

defp count_letters(graphemes) do
  Enum.reduce(graphemes, %{}, fn grapheme, acc ->
    if String.match?(grapheme, ~r/^\p{L}$/u) do
      downcased_letter = String.downcase(grapheme)
      Map.update(acc, downcased_letter, 1, fn count -> count + 1 end)
    else
      acc
    end
  end)
end

Enum のナニな理解が若干微妙 (別途で確認します)。これを並行処理にするためには以下なヘルパー追加して

defp split_into_chunks(all_graphemes, num_chunks) do
  all_graphemes_count = Enum.count(all_graphemes)
  graphemes_per_chunk = :erlang.ceil(all_graphemes_count / num_chunks)

  Enum.chunk_every(all_graphemes, graphemes_per_chunk)
end

defp merge_results_stream(results_stream) do
  Enum.reduce(results_stream, %{}, fn {:ok, worker_result}, acc ->
    Map.merge(acc, worker_result, fn _key, acc_val, worker_val ->
      acc_val + worker_val
    end)
  end)
end

frequency/2 を以下に、とのこと。

def frequency(texts, workers) do
  texts
  |> get_all_graphemes()
  |> split_into_chunks(workers)
  |> Task.async_stream(&count_letters/1)
  |> merge_results_stream()
end

ヘルパーについても別途確認してエントリ投入します。ちょっとばたばたで時間取れず。

機械翻訳な gist

以下です。

OSX 端末 Enum、Map、String などの確認

comments powered by Disqus