我有几个正在异步运行的任务。 根据输入的不同,一项或多项任务可能会运行很长时间,但只有一项任务会返回 :success 消息。
slowtask = Task.async(slow())
fasttask = Task.async(fast())
如何捕获上述两个任务中的第一个任务完成,而不必等待另一个任务?我已经尝试过
Task.find/2
,但因为它是用枚举实现的,所以它似乎在找到引用/消息之前等待所有退出信号。我的另一个想法是在 Stream.cycle
中进行轮询,忽略仍然存在的任务并捕获已退出的任务。 不过,以这种方式进行民意调查似乎不是什么灵丹妙药。
在 Elixir 上还没有简单的方法可以做到这一点。您最好的选择是,如果您仅在给定进程中等待这些消息,则如下所示:
defmodule TaskFinder do
def run do
task1 = Task.async fn -> :timer.sleep(1000); 1 end
task2 = Task.async fn -> :timer.sleep(5000); 2 end
await [task1, task2]
end
# Be careful, this will receive all messages sent
# to this process. It will return the first task
# reply and the list of tasks that came second.
def await(tasks) do
receive do
message ->
case Task.find(tasks, message) do
{reply, task} ->
{reply, List.delete(tasks, task)}
nil ->
await(tasks)
end
end
end
end
IO.inspect TaskFinder.run
请注意,您还可以使用此模式在 GenServer 中生成任务并使用
Task.find/2
查找匹配的任务。我也已将此示例添加到 Elixir 文档中。
要获得第一个结果,您应该等待消息,然后将消息传递给
Task.find/2
,并处理第一个结果,其形式为{task_result, task}
。
defmodule Tasks do
def run do
:random.seed(:os.timestamp)
durations = Enum.shuffle(1..10)
Enum.map(durations, fn(duration) -> Task.async(fn -> run_task(duration) end) end)
|> get_first_result
|> IO.inspect
end
defp get_first_result(tasks) do
receive do
msg ->
case Task.find(tasks, msg) do
{result, _task} ->
# got the result
result
nil ->
# no result -> continue waiting
get_first_result(tasks)
end
end
end
defp run_task(1) do
:success
end
defp run_task(duration) do
:timer.sleep(duration * 100)
:ok
end
end
如果“主”进程是
GenServer
,您应该从 Task.find/2
内部调用 handle_info/2
,而不是运行此递归循环。
根据 Jose Valim 的回答,这是我用来匹配返回回复的内容:
def run do
task1 = Task.async(fn -> :timer.sleep(10000); :slow end)
task2 = Task.async(fn -> :timer.sleep(2000); :fail end)
task3 = Task.async(fn -> :timer.sleep(1000); :fail end)
await([task1, task2, task3])
end
def await(tasks) do
receive do
message ->
case Task.find(tasks, message) do
{:fail, task} ->
await(List.delete(tasks, task))
{reply, _task} ->
reply
nil ->
await(tasks)
end
end
end
这使我能够匹配第一个函数以返回除 :fail 原子之外的其他内容,并给我回复。 这是否有效,因为 receive/1 只是等待任何消息的到来?
Task.find
已弃用。 使用 Task.yield_many
且 limit
为 1
。 当第一个任务通过 on_timeout: :kill_task
返回时,终止其他任务。 多个任务可能会在同一时间返回,所以选择第一个。
defmodule WaitOnFirstTask do
def run do
tasks =
for i <- 1..10 do
Task.async(fn ->
Process.sleep(:timer.seconds(i))
i
end)
end
{_task, {:ok, 1}} =
tasks
|> Task.yield_many(limit: 1, on_timeout: :kill_task, timeout: :infinity)
|> Enum.filter(fn
{_task, {:ok, value}} -> true
{_task, {:exit, _}} -> false
{_task, nil} -> false
end)
|> List.first()
end
end
WaitOnFirstTask.run()