GenServer GenServer.call可以在Elixir中使用吗?

问题描述 投票:0回答:2

此代码终止,因为“进程试图在GenServer中的{:handletick,...}调用处理程序上调用自身”:

defmodule PeriodicSaver do
  @moduledoc """
  Functions for periodically saving streaming data, with minute resolutions, 
  into the database
  """

  use GenServer
  use Timex


  def start_link() do
    GenServer.start_link(__MODULE__, %{:valuemap => %{}}, name: :periodic_saver)
  end


  def init(state) do
    IO.write "init PeriodicSaver"
    {:ok, state}
  end


  def handle_info({:inmsg, msg}, state) do
    # handles parsing of messages
    case msg do
      {:ok, %{"bstamp" => bstamp,
              "ticker" => ticker,
              "value" => value, 
              "time" => time,
              "source" => source}} -> 
        GenServer.call self(), {:handle_tick, ticker, time, value}
      _ -> 
        IO.write("ERROR")
    end
    {:noreply, state}
  end


  def handle_call({:handle_tick, ticker, time, value}, state) do
    cond do
      time > Timex.now ->
        IO.puts "Timestamp > now error"
      not Map.has_key?(state[:valuemap], ticker) ->  
        # first time this period we get this ticker
        IO.puts time
        IO.puts "helloooo"
        state = put_in state, [:valuemap, ticker], %{}
        state = put_in state, [:valuemap, ticker, :open], value
        state = put_in state, [:valuemap, ticker, :open], value
        state = put_in state, [:valuemap, ticker, :high], value
        state = put_in state, [:valuemap, ticker, :low], value
        state = put_in state, [:valuemap, ticker, :close], value
        state = put_in state, [:valuemap, ticker, :latest_time], time
        state = put_in state, [:valuemap, ticker, :next_minute], next_minute(time, 0, 1)
      state[:valuemap][ticker][:latest_time] > time -> 
        # we got an out of order time
        IO.puts "timestamp is lower than latest time for this ticker"
        IO.inspect ticker
        IO.inspect time
        IO.inspect state[:valuemap][ticker]
      state[:valuemap][ticker][:next_minute] < time -> 
        # flush to database and clear this entry from the map
        IO.write "flushing this: "
        IO.inspect state[:valuemap][ticker]
        state = pop_in(state, [:valuemap, ticker]) |> elem(1)
      true -> 
        "true found -> no conditions met"
    end
    {:reply, Nil, state}
  end


  def loop_next(x, current_time, every) do
    # finds the next minute according to the conditions
    cond do
      Timex.to_unix(x) > Timex.to_unix(current_time) -> 
        x
      true -> 
        loop_next Timex.add(x, Timex.Duration.from_minutes(every)), 
          current_time, every
    end
  end


  def next_minute(current_time, start_minute, every) do
    # next minute after which any aggregates must be saved
    Timex.today
    |> Timex.to_datetime
    |> Timex.add(Timex.Duration.from_minutes(start_minute))
    |> loop_next current_time, every
  end


end

正如您所看到的,我有一个handle_call函数,其中包含{:handle_tick,...} atom,我在其send_info({:inmsg,msg})函数中从GenServer中调用。这是不允许的?我想这样做以一致的方式改变状态。我是否应该只写一个接收状态并输出新状态的普通函数?

elixir
2个回答
1
投票

GenServer.call是同步的,因此它等待handle_call的结果,但它无法处理,因为你当前的handle_info没有返回,导致死锁。

为了避免这个问题(特别是,因为你不依赖于从这个调用返回的值),你可以把它变成组合sendhandle_info

考虑以下示例:

def handle_info({:inmsg, msg}, state) do
# handles parsing of messages
  case msg do
  {:ok, %{"bstamp" => bstamp,
          "ticker" => ticker,
          "value" => value, 
          "time" => time,
          "source" => source}} -> 
    send(self(), {:handle_tick, ticker, time, value})
  _ -> 
    IO.write("ERROR")
 end
 {:noreply, state}
end

# ...

def handle_info({:handle_tick, ticker, time, value}, state) do
  # ...
end

希望有所帮助!


1
投票

而不是让GenServer向自己发送消息,你可以让它只是调用一个函数。

defmodule PeriodicSaver do
  @moduledoc """
  Functions for periodically saving streaming data, with minute resolutions, 
  into the database
  """

  use GenServer
  use Timex

  def start_link() do
    GenServer.start_link(__MODULE__, %{:valuemap => %{}}, name: :periodic_saver)
  end

  def init(state) do
    IO.write "init PeriodicSaver"
    {:ok, state}
  end

  def handle_info({:inmsg, msg}, state) do
    # handles parsing of messages
    case msg do
      {:ok, %{"bstamp" => bstamp,
              "ticker" => ticker,
              "value" => value, 
              "time" => time,
              "source" => source}} -> 
        do_tick(ticker, time, value, state)
      _ -> 
        IO.write("ERROR")
    end
    {:noreply, state}
  end


  def handle_call({:handle_tick, ticker, time, value}, state) do
    do_tick(ticker, time, value, state)
  end

  defp do_tick(ticker, time, value, state) do
    cond do
      time > Timex.now ->
        IO.puts "Timestamp > now error"
      not Map.has_key?(state[:valuemap], ticker) ->  
        # first time this period we get this ticker
        IO.puts time
        IO.puts "helloooo"
        state = put_in state, [:valuemap, ticker], %{}
        state = put_in state, [:valuemap, ticker, :open], value
        state = put_in state, [:valuemap, ticker, :open], value
        state = put_in state, [:valuemap, ticker, :high], value
        state = put_in state, [:valuemap, ticker, :low], value
        state = put_in state, [:valuemap, ticker, :close], value
        state = put_in state, [:valuemap, ticker, :latest_time], time
        state = put_in state, [:valuemap, ticker, :next_minute], next_minute(time, 0, 1)
      state[:valuemap][ticker][:latest_time] > time -> 
        # we got an out of order time
        IO.puts "timestamp is lower than latest time for this ticker"
        IO.inspect ticker
        IO.inspect time
        IO.inspect state[:valuemap][ticker]
      state[:valuemap][ticker][:next_minute] < time -> 
        # flush to database and clear this entry from the map
        IO.write "flushing this: "
        IO.inspect state[:valuemap][ticker]
        state = pop_in(state, [:valuemap, ticker]) |> elem(1)
      true -> 
        "true found -> no conditions met"
    end
    {:reply, Nil, state}
  end
end

注意创建一个新的do_tick/4函数。然而,这有可能在收到:inmsg的同时完成。

© www.soinside.com 2019 - 2024. All rights reserved.