有很多如何在 f# 中执行异步任务的示例
[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously
但是我如何才能异步等待第一个结果呢?
例如,如果我想运行一些并行查找任务,并且我想在获得第一个成功结果时进行更深入的搜索。 (基本上相当于 C# 世界中的
Task.WhenAny
...)
我会用类似的东西:
let any asyncs =
async {
let t =
asyncs
|> Seq.map Async.StartAsTask
|> System.Threading.Tasks.Task.WhenAny
return t.Result.Result }
可以在以下代码片段中找到通用解决方案:http://fssnip.net/dN
Async.Choice
可以嵌入到任何异步工作流程中,就像 Async.Parallel
一样。可选的输出类型编码了子计算完成但没有令人满意的结果的可能性。
我能想到的最简单的实现看起来像这样:
open FSharp.Control
let getOneOrOther () =
let queue = BlockingQueueAgent(1)
let async1 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
do! queue.AsyncAdd(1) } |> Async.Start
let async2 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
do! queue.AsyncAdd(2) } |> Async.Start
queue.Get()
for i in 1..10 do
printfn "%d" <| getOneOrOther ()
Console.ReadLine () |> ignore
它依赖于 FSharpx 项目的阻塞队列实现,您可能会出于其他原因需要它。但是,如果您不需要任何依赖项,
System.Collections.Concurrent
还包括一个阻塞队列,但界面稍差一些。
对于内置取消功能的更通用版本,下面的版本采用
Seq<unit -> Async<'T>>
并返回要通过的第一个结果,取消所有其他结果。
open FSharp.Control
open System.Threading
let async1 () = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 1 }
let async2 () = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 2 }
let getFirst asyncs =
let queue = BlockingQueueAgent(1)
let doWork operation = async {
let! result = operation()
do! queue.AsyncAdd(result) }
let start work =
let cts = new CancellationTokenSource()
Async.Start(work, cts.Token)
cts
let cancellationTokens =
asyncs
|> Seq.map doWork
|> Seq.map start
let result = queue.Get()
cancellationTokens
|> Seq.iter (fun cts -> cts.Cancel(); cts.Dispose())
result
for i in 1..10 do
printfn "%A" <| getFirst [async1;async2]
Console.ReadLine () |> ignore
另一种基于事件的实现:
let Choice (asyncs: seq<Async<'T>>) : Async<'T> =
async {
let e = Event<'T>()
let cts = new System.Threading.CancellationTokenSource()
do Async.Start(
asyncs
|> Seq.map (fun a -> async { let! x = a in e.Trigger x })
|> Async.Parallel
|> Async.Ignore,
cts.Token)
let! result = Async.AwaitEvent e.Publish
cts.Cancel()
return result
}
似乎这个解决方案足够简单,无阻塞并且适用于我的情况
let any (list: Async<'T>[])=
let tcs = new TaskCompletionSource<'T>()
list |> Array.map (fun wf->Async.Start (async{
let! res=wf
tcs.TrySetResult (res) |> ignore
}))
|> ignore
Async.AwaitTask tcs.Task
let async1 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 1 }
let async2 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 2 }
printfn "%d" <| ([|async1;async2|] |> any |> Async.RunSynchronously)
自 F# 4.5 起,它内置为
Async.Choice
https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Choice