/var/log/messages

Jun 11, 2019 - 2 minute read - Comments - programming

GenStage

以下、確認しつつ控えを以降に。

三種類の役割が用意されている、とのこと。

  • :producer
  • :producer_consumer
  • :consumer

producer は最速を待っている、とのこと。

  • バックプレッシャー : consumer の処理が busy の時に過剰なプレッシャーがかからないようにする仕組み

作成着手

とりあえず Google Cloud Shell に接続。事前準備として以下なスクリプト実行。

#/bin/bash

sudo dpkg -i ~/erlang-solutions_1.0_all.deb
sudo apt-get update
sudo apt-get install esl-erlang -fy
sudo apt-get install elixir -fy

で、以下を実行。

$ mix new genstage_example --sup
$ cd genstage_example

で、依存ライブラリに gen_stage 追加して以下。

$ mix do deps.get, compile
Resolving Hex dependencies...
Dependency resolution completed:
New:
  gen_stage 0.14.1
* Getting gen_stage (Hex package)
==> gen_stage
Compiling 10 files (.ex)
Generated gen_stage app
==> genstage_example
Compiling 2 files (.ex)
Generated genstage_example app

これで producer を云々する用意ができた、とのこと。

producer

第一歩は producer の作成、とのこと。ファイルを作成して

$ mkdir lib/genstage_example
$ touch lib/genstage_example/producer.ex

ディレクトリは既に存在。producer.ex を以下に。

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end
  • init/1 で初期状態をセットして自分自身を :producer とラベル付け
  • handle_demand/2 は全ての producer が実装しなければならない

バックプレッシャー?

なんかよくわからない。以下を確認した方が良さげ。

とりあえず

  • handle_demand/2 が戻すのは { :noreply, 結果のリスト, 次回の開始値 } とのこと

や、handle_demand に渡される第一引数の demand の出所がわからんぞ。あ、次回の期待値か、むむむむ。

わかった

とりあえず実装すすめてみます

producer_consumer

作成。

$ touch lib/genstage_example/producer_consumer.ex

で、中身が以下。

defmodule GenstageExample.ProducerConsumer do
  use GenStage

  require Integer

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.filter(&Integer.is_even/1)

    {:noreply, numbers, state}
  end
end

handle_events/3 で操作して次に渡しています。

consumer

以下で作成。

$ touch lib/genstage_example/consumer.ex

中身が以下。

defmodule GenstageExample.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # 消費者としては二度とイベントを出力しない
    {:noreply, [], state}
  end
end

application.ex

以下。ここがポイントでした。

def start(_type, _args) do
  import Supervisor.Spec, warn: false

  children = [
    worker(GenstageExample.Producer, [0]),
    worker(GenstageExample.ProducerConsumer, []),
    worker(GenstageExample.Consumer, [])
  ]

  opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
  Supervisor.start_link(children, opts)
end

ここで worker で指定してるソレが最初の期待値なのか。

Google Cloud Shell の C-c

C-c で良いのかどうかorz

以下、別途確認の方向にて

書いて覚える? Phrasal verbs (2)

comments powered by Disqus