如何返回第一个异步任务完成

问题描述 投票:0回答:4

我有几个正在异步运行的任务。 根据输入的不同,一项或多项任务可能会运行很长时间,但只有一项任务会返回 :success 消息。

slowtask = Task.async(slow())
fasttask = Task.async(fast())

如何捕获上述两个任务中的第一个任务完成,而不必等待另一个任务?我已经尝试过

Task.find/2
,但因为它是用枚举实现的,所以它似乎在找到引用/消息之前等待所有退出信号。我的另一个想法是在
Stream.cycle
中进行轮询,忽略仍然存在的任务并捕获已退出的任务。 不过,以这种方式进行民意调查似乎不是什么灵丹妙药。

elixir
4个回答
9
投票

在 Elixir 上还没有简单的方法可以做到这一点。您最好的选择是,如果您仅在给定进程中等待这些消息,则如下所示:

  defmodule TaskFinder do
    def run do
      task1 = Task.async fn -> :timer.sleep(1000); 1 end
      task2 = Task.async fn -> :timer.sleep(5000); 2 end
      await [task1, task2]
    end

    # Be careful, this will receive all messages sent
    # to this process. It will return the first task
    # reply and the list of tasks that came second.
    def await(tasks) do
      receive do
        message ->
          case Task.find(tasks, message) do
            {reply, task} ->
              {reply, List.delete(tasks, task)}
            nil ->
              await(tasks)
          end
      end
    end
  end

  IO.inspect TaskFinder.run

请注意,您还可以使用此模式在 GenServer 中生成任务并使用

Task.find/2
查找匹配的任务。我也已将此示例添加到 Elixir 文档中。


3
投票

要获得第一个结果,您应该等待消息,然后将消息传递给

Task.find/2
,并处理第一个结果,其形式为
{task_result, task}

defmodule Tasks do
  def run do
    :random.seed(:os.timestamp)
    durations = Enum.shuffle(1..10)

    Enum.map(durations, fn(duration) -> Task.async(fn -> run_task(duration) end) end)
    |> get_first_result
    |> IO.inspect
  end

  defp get_first_result(tasks) do
    receive do
      msg ->
        case Task.find(tasks, msg) do
          {result, _task} ->
            # got the result
            result

          nil -> 
            # no result -> continue waiting
            get_first_result(tasks)
        end
    end
  end


  defp run_task(1) do
    :success
  end

  defp run_task(duration) do
    :timer.sleep(duration * 100)
    :ok
  end
end

如果“主”进程是

GenServer
,您应该从
Task.find/2
内部调用
handle_info/2
,而不是运行此递归循环。


2
投票

根据 Jose Valim 的回答,这是我用来匹配返回回复的内容:

def run do
    task1 = Task.async(fn -> :timer.sleep(10000); :slow end)
    task2 = Task.async(fn -> :timer.sleep(2000); :fail end)
    task3 = Task.async(fn -> :timer.sleep(1000); :fail end)

    await([task1, task2, task3])
end

def await(tasks) do
  receive do
    message ->
      case Task.find(tasks, message) do
        {:fail, task} ->
          await(List.delete(tasks, task))
        {reply, _task} ->
          reply             
        nil ->
          await(tasks)
      end
  end
end

这使我能够匹配第一个函数以返回除 :fail 原子之外的其他内容,并给我回复。 这是否有效,因为 receive/1 只是等待任何消息的到来?


0
投票

Task.find
已弃用。 使用
Task.yield_many
limit
1
。 当第一个任务通过
on_timeout: :kill_task
返回时,终止其他任务。 多个任务可能会在同一时间返回,所以选择第一个。

defmodule WaitOnFirstTask do
  def run do
    tasks =
      for i <- 1..10 do
        Task.async(fn ->
          Process.sleep(:timer.seconds(i))
          i
        end)
      end

    {_task, {:ok, 1}} =
      tasks
      |> Task.yield_many(limit: 1, on_timeout: :kill_task, timeout: :infinity)
      |> Enum.filter(fn
        {_task, {:ok, value}} -> true
        {_task, {:exit, _}} -> false
        {_task, nil} -> false
      end)
      |> List.first()
  end
end

WaitOnFirstTask.run()
© www.soinside.com 2019 - 2024. All rights reserved.