F# 如何运行多个异步任务并等待第一个完成的结果?

问题描述 投票:0回答:6

有很多如何在 f# 中执行异步任务的示例

[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously

但是我如何才能异步等待第一个结果呢?

例如,如果我想运行一些并行查找任务,并且我想在获得第一个成功结果时进行更深入的搜索。 (基本上相当于 C# 世界中的

Task.WhenAny
...)

asynchronous f#
6个回答
7
投票

我会用类似的东西:

let any asyncs =
    async {
        let t = 
            asyncs
            |> Seq.map Async.StartAsTask
            |> System.Threading.Tasks.Task.WhenAny
        return t.Result.Result }

5
投票

可以在以下代码片段中找到通用解决方案:http://fssnip.net/dN

Async.Choice
可以嵌入到任何异步工作流程中,就像
Async.Parallel
一样。可选的输出类型编码了子计算完成但没有令人满意的结果的可能性。


3
投票

我能想到的最简单的实现看起来像这样:

open FSharp.Control

let getOneOrOther () =
    let queue = BlockingQueueAgent(1)
    let async1 = async {
            do! Async.Sleep (System.Random().Next(1000, 2000))
            do! queue.AsyncAdd(1) } |> Async.Start
    let async2 = async {
            do! Async.Sleep (System.Random().Next(1000, 2000))
            do! queue.AsyncAdd(2) } |> Async.Start

    queue.Get()

for i in 1..10 do
    printfn "%d" <| getOneOrOther ()

Console.ReadLine () |> ignore

它依赖于 FSharpx 项目的阻塞队列实现,您可能会出于其他原因需要它。但是,如果您不需要任何依赖项,

System.Collections.Concurrent
还包括一个阻塞队列,但界面稍差一些。

对于内置取消功能的更通用版本,下面的版本采用

Seq<unit -> Async<'T>>
并返回要通过的第一个结果,取消所有其他结果。

open FSharp.Control
open System.Threading

let async1 () = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 1 }
let async2 () = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 2 }

let getFirst asyncs =
    let queue = BlockingQueueAgent(1)
    let doWork operation = async { 
            let! result = operation()
            do! queue.AsyncAdd(result) }
    let start work =
        let cts = new CancellationTokenSource()
        Async.Start(work, cts.Token)
        cts
    let cancellationTokens =
        asyncs
        |> Seq.map doWork
        |> Seq.map start
    let result = queue.Get()
    cancellationTokens
    |> Seq.iter (fun cts -> cts.Cancel(); cts.Dispose())
    result

for i in 1..10 do
    printfn "%A" <| getFirst [async1;async2]

Console.ReadLine () |> ignore

3
投票

另一种基于事件的实现:

let Choice (asyncs: seq<Async<'T>>) : Async<'T> =
    async {
        let e = Event<'T>()
        let cts = new System.Threading.CancellationTokenSource()
        do Async.Start(
            asyncs
            |> Seq.map (fun a -> async { let! x = a in e.Trigger x })
            |> Async.Parallel
            |> Async.Ignore,
            cts.Token)
        let! result = Async.AwaitEvent e.Publish
        cts.Cancel()
        return result
    }

1
投票

似乎这个解决方案足够简单,无阻塞并且适用于我的情况

let any (list: Async<'T>[])=
    let tcs = new TaskCompletionSource<'T>()

    list |> Array.map (fun wf->Async.Start (async{
                let! res=wf
                tcs.TrySetResult (res) |> ignore
            }))
         |> ignore

    Async.AwaitTask tcs.Task

let async1 = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 1 }
let async2 = async {
        do! Async.Sleep (System.Random().Next(1000, 2000))
        return 2 }

printfn "%d" <| ([|async1;async2|] |> any |> Async.RunSynchronously)

© www.soinside.com 2019 - 2024. All rights reserved.