此代码终止,因为“进程试图在GenServer中的{:handletick,...}调用处理程序上调用自身”:
defmodule PeriodicSaver do
@moduledoc """
Functions for periodically saving streaming data, with minute resolutions,
into the database
"""
use GenServer
use Timex
def start_link() do
GenServer.start_link(__MODULE__, %{:valuemap => %{}}, name: :periodic_saver)
end
def init(state) do
IO.write "init PeriodicSaver"
{:ok, state}
end
def handle_info({:inmsg, msg}, state) do
# handles parsing of messages
case msg do
{:ok, %{"bstamp" => bstamp,
"ticker" => ticker,
"value" => value,
"time" => time,
"source" => source}} ->
GenServer.call self(), {:handle_tick, ticker, time, value}
_ ->
IO.write("ERROR")
end
{:noreply, state}
end
def handle_call({:handle_tick, ticker, time, value}, state) do
cond do
time > Timex.now ->
IO.puts "Timestamp > now error"
not Map.has_key?(state[:valuemap], ticker) ->
# first time this period we get this ticker
IO.puts time
IO.puts "helloooo"
state = put_in state, [:valuemap, ticker], %{}
state = put_in state, [:valuemap, ticker, :open], value
state = put_in state, [:valuemap, ticker, :open], value
state = put_in state, [:valuemap, ticker, :high], value
state = put_in state, [:valuemap, ticker, :low], value
state = put_in state, [:valuemap, ticker, :close], value
state = put_in state, [:valuemap, ticker, :latest_time], time
state = put_in state, [:valuemap, ticker, :next_minute], next_minute(time, 0, 1)
state[:valuemap][ticker][:latest_time] > time ->
# we got an out of order time
IO.puts "timestamp is lower than latest time for this ticker"
IO.inspect ticker
IO.inspect time
IO.inspect state[:valuemap][ticker]
state[:valuemap][ticker][:next_minute] < time ->
# flush to database and clear this entry from the map
IO.write "flushing this: "
IO.inspect state[:valuemap][ticker]
state = pop_in(state, [:valuemap, ticker]) |> elem(1)
true ->
"true found -> no conditions met"
end
{:reply, Nil, state}
end
def loop_next(x, current_time, every) do
# finds the next minute according to the conditions
cond do
Timex.to_unix(x) > Timex.to_unix(current_time) ->
x
true ->
loop_next Timex.add(x, Timex.Duration.from_minutes(every)),
current_time, every
end
end
def next_minute(current_time, start_minute, every) do
# next minute after which any aggregates must be saved
Timex.today
|> Timex.to_datetime
|> Timex.add(Timex.Duration.from_minutes(start_minute))
|> loop_next current_time, every
end
end
正如您所看到的,我有一个handle_call函数,其中包含{:handle_tick,...} atom,我在其send_info({:inmsg,msg})函数中从GenServer中调用。这是不允许的?我想这样做以一致的方式改变状态。我是否应该只写一个接收状态并输出新状态的普通函数?
GenServer.call
是同步的,因此它等待handle_call
的结果,但它无法处理,因为你当前的handle_info
没有返回,导致死锁。
为了避免这个问题(特别是,因为你不依赖于从这个调用返回的值),你可以把它变成组合send
和handle_info
。
考虑以下示例:
def handle_info({:inmsg, msg}, state) do
# handles parsing of messages
case msg do
{:ok, %{"bstamp" => bstamp,
"ticker" => ticker,
"value" => value,
"time" => time,
"source" => source}} ->
send(self(), {:handle_tick, ticker, time, value})
_ ->
IO.write("ERROR")
end
{:noreply, state}
end
# ...
def handle_info({:handle_tick, ticker, time, value}, state) do
# ...
end
希望有所帮助!
而不是让GenServer
向自己发送消息,你可以让它只是调用一个函数。
defmodule PeriodicSaver do
@moduledoc """
Functions for periodically saving streaming data, with minute resolutions,
into the database
"""
use GenServer
use Timex
def start_link() do
GenServer.start_link(__MODULE__, %{:valuemap => %{}}, name: :periodic_saver)
end
def init(state) do
IO.write "init PeriodicSaver"
{:ok, state}
end
def handle_info({:inmsg, msg}, state) do
# handles parsing of messages
case msg do
{:ok, %{"bstamp" => bstamp,
"ticker" => ticker,
"value" => value,
"time" => time,
"source" => source}} ->
do_tick(ticker, time, value, state)
_ ->
IO.write("ERROR")
end
{:noreply, state}
end
def handle_call({:handle_tick, ticker, time, value}, state) do
do_tick(ticker, time, value, state)
end
defp do_tick(ticker, time, value, state) do
cond do
time > Timex.now ->
IO.puts "Timestamp > now error"
not Map.has_key?(state[:valuemap], ticker) ->
# first time this period we get this ticker
IO.puts time
IO.puts "helloooo"
state = put_in state, [:valuemap, ticker], %{}
state = put_in state, [:valuemap, ticker, :open], value
state = put_in state, [:valuemap, ticker, :open], value
state = put_in state, [:valuemap, ticker, :high], value
state = put_in state, [:valuemap, ticker, :low], value
state = put_in state, [:valuemap, ticker, :close], value
state = put_in state, [:valuemap, ticker, :latest_time], time
state = put_in state, [:valuemap, ticker, :next_minute], next_minute(time, 0, 1)
state[:valuemap][ticker][:latest_time] > time ->
# we got an out of order time
IO.puts "timestamp is lower than latest time for this ticker"
IO.inspect ticker
IO.inspect time
IO.inspect state[:valuemap][ticker]
state[:valuemap][ticker][:next_minute] < time ->
# flush to database and clear this entry from the map
IO.write "flushing this: "
IO.inspect state[:valuemap][ticker]
state = pop_in(state, [:valuemap, ticker]) |> elem(1)
true ->
"true found -> no conditions met"
end
{:reply, Nil, state}
end
end
注意创建一个新的do_tick/4
函数。然而,这有可能在收到:inmsg
的同时完成。