並行と並列と、という部分で参考になるドキュメントを発見。
機械翻訳に通してみたのですがそれ見つつで色々動作確認できるのかどうか。ざっくり控えを以下に。
整理
ええと
- concurrency が並行
- 複数のタスクを「進行中」にすることできるがどの時点でも CPU 上で一つのタスクしか実行されていない
- parallelism が並列
- 並列プログラムは複数の CPU コア上で複数のタスクを同時に実行できる
とのことで、このあたりはまだぱっと分からない鳥頭。
Elixir における並行、並列
- Elixir はランタイムレベルで並行している
- Elixir では並行コードと並列コードの間に区別はない
- 並行コードを書くことだけで利用可能なコアが複数ある場合は VM が自動的にそれを並列化する
Task.async_stream/3
async_stream(enumerable, function, options \\ [])
- enumerable の各項目で指定された関数を同時に実行するストリームを戻す
- default では生成されたプロセスの数は列挙可能なアイテムの数と同じ
以下な実装が例示されています。これを並行処理に、とのこと。
def frequency(texts, _workers) do
texts
|> get_all_graphemes()
|> count_letters()
end
defp get_all_graphemes(texts) do
texts
|> Enum.join()
|> String.graphemes()
end
defp count_letters(graphemes) do
Enum.reduce(graphemes, %{}, fn grapheme, acc ->
if String.match?(grapheme, ~r/^\p{L}$/u) do
downcased_letter = String.downcase(grapheme)
Map.update(acc, downcased_letter, 1, fn count -> count + 1 end)
else
acc
end
end)
end
Enum のナニな理解が若干微妙 (別途で確認します)。これを並行処理にするためには以下なヘルパー追加して
defp split_into_chunks(all_graphemes, num_chunks) do
all_graphemes_count = Enum.count(all_graphemes)
graphemes_per_chunk = :erlang.ceil(all_graphemes_count / num_chunks)
Enum.chunk_every(all_graphemes, graphemes_per_chunk)
end
defp merge_results_stream(results_stream) do
Enum.reduce(results_stream, %{}, fn {:ok, worker_result}, acc ->
Map.merge(acc, worker_result, fn _key, acc_val, worker_val ->
acc_val + worker_val
end)
end)
end
frequency/2
を以下に、とのこと。
def frequency(texts, workers) do
texts
|> get_all_graphemes()
|> split_into_chunks(workers)
|> Task.async_stream(&count_letters/1)
|> merge_results_stream()
end
ヘルパーについても別途確認してエントリ投入します。ちょっとばたばたで時間取れず。
機械翻訳な gist
以下です。