F# 中 Async.Sequential 的任务版本是什么

问题描述 投票:0回答:3

下面的代码曾经有效 - 但我正在将许多 async{} 函数转换为 task{},并且我不知道如何使其在没有可变变量的情况下按顺序运行。

let processBatchTransaction(page: IPage, 
                            bnzBatch: BNZBatch, 
                            cardTransactionswithOrder: BNZCardWithOrder list,
                            xeroClient: XeroClientFull) : Task<StatusMessage> = task {
    let matchedTransactionsSeq 
        = seq{
                for cardTransactionWithOrder in cardTransactionswithOrder do
                    let matchedTransaction = matchTransaction(page,bnzBatch,cardTransactionWithOrder, xeroClient)
                    yield matchedTransaction
        }

    let! matchedTransactions = matchedTransactionsSeq |> Async.Sequential
.... etc
}

“matchTransaction”函数曾经是异步的 - 但没有 Task.Sequential,只有 Task.WhenAll,但我需要它们一个接一个地同步运行。

asynchronous f# f#-async
3个回答
4
投票

编辑:请参阅下面的所有内容以获得更简单、更惯用的解决方案。线下的原始答案。


正如所承诺的(尽管布莱恩已经给出了一个很好的答案),这是我的。让我们首先了解一些规则:

  • 使用
    task { ... }
    创建的任务将始终是热启动的。这意味着它立即在当前线程(或后台线程,如果您使用
    backgroundTask
    )上运行。
  • 通过 TPL 使用
    Task<_>
    类创建的任务将始终延迟启动。就像
    async
    一样,你必须手动启动这些。
  • task
    CE 中绑定任何任务都会启动该任务。
  • 下一个绑定表达式只有在前一个绑定表达式完成后才会开始。

这意味着,给定带有

Task
的数组或序列,很可能在您获取它们时它们已经在运行。由于它们不是通过
bind
创建的,因此它们将异步运行。为了防止这种情况,我们需要一些规则。

编辑:如果您还没有检查过F#+,它可以开箱即用地执行以下操作。

规则 0:处理任务时,始终返回
task

组合任务、绑定任务或加入任务时,始终以

task
形式返回结果。这将简化您的整体流程,此外,您无法在不阻塞线程的情况下返回非任务(或非异步)。

规则 1:延迟你的任务

Brian 建议通过

lazy
来完成此操作。这可以。您也可以简单地使用单位函数:
fun() -> task { dosomething }

规则 2:不要使用
.Result
.Wait()

这些会阻塞你的线程。

规则 3:不要使用类似
List.traverseTaskResultA

的东西

这些函数是优秀的

FsToolkit.ErrorHandling.TaskResult
库的一部分,不会像
run X -> wait for result -> run Y -> wait for result
那样执行您的任务。相反,他们会
run X -> run Y -> run Z -> asTask

换句话说,那些库函数会导致异步、重叠执行。

规则 4:不要使用
Thread.Sleep

几乎没有规则,但是当我测试你的场景时,我使用了

Thread.Sleep
。问题是,这个函数阻塞了当前线程。它会给人一种错误的印象,即您的函数在测试场景中按顺序运行,但在现实场景中它们不再按顺序运行。

请使用

Task.Delay
代替。它的工作原理相同,但不会阻塞线程。

解决方案

有多种方法可以做到这一点。我将仅使用

ContinueWith
向您展示一个。步骤:

  • 作为单位职能,你的任务必须延迟返回
  • 你们的任务不依赖于彼此的结果(但这可以很容易地改变)
  • 将您的任务包含在延续中,然后再次
    Unwrap
    该延续。

最后一个看起来有点奇怪,但 TPL 没有标准

bind
。 F# 中的
bind
虽然适用于此,但有点难以使用。
Ply
lib 提供了更简单的绑定,您也可以尝试一下。

无论如何,这是最核心的方法,仅使用标准库函数。好消息是,您只需编写此函数一次。

/// Join multiple delayed tasks and return the result of the last
let join tasks =
    let wrapNext (t: unit -> Task<_>) (source: unit -> Task<_>): unit -> Task<_> =
        fun () ->
            source()
                // this is the CORE of the whole operation
                .ContinueWith((fun (_: Task) -> t ()), TaskContinuationOptions.OnlyOnRanToCompletion)
                // extra step needed, as BCL has no direct way to unwrap nested tasks
                .Unwrap() :?> Task<_>

    let rec combine acc (tasks: (unit -> Task<_>) list) =
        match tasks with
        | [] -> acc
        | t :: tail -> combine (wrapNext t acc) tail

    match tasks with
    | first :: rest -> combine first rest
    | [] -> failwith "oh oh, no tasks given!"

以下是如何使用它。第一个函数看起来有点老套,但它只是为了模仿您的场景并能够仔细检查它是否按顺序运行。

/// Create 10 tasks, use stream for writing, otherwise would garble FSI
/// Note that those can be task or backgroundTask
let createBunchOfTasks(sw: StreamWriter) =
    let mutable x = 0
    let rnd () = Random().Next(10, 30)
    let o = obj ()

    let runTask i = backgroundTask {
        let! _ = Task.Delay(rnd ())  // use randomization to ensure tasks last different times
        x <- x + 1

        // just some logging to a file, stdout is not good in this case
        lock o (fun () -> sw.WriteLine(sprintf "Task #%i after delay: %i" i x))
        return x
    }

    [
        // creating bunch of dummy tasks
        for i in 0..10 do
            fun () -> runTask i
    ]

在您的场景中,您不需要上述代码,但您将需要类似的代码来调用

join
函数。如果您需要记录到文件,您可以使用此模式,但它实际上只是用于我的健全性检查,并且相当粗糙;)。

let runMultipleTasks() =
    // should give this as argument, as must be closed after all tasks completed
    let file = File.Open("output1.txt", FileMode.Create)
    let stream = new StreamWriter(file)

    // the actual creation of tasks 
    let tasks = createBunchOfTasks stream
    let combinedTask = join tasks

    // start the combined tasks
    combinedTask()

更简单、更惯用的解决方案

在对任务列表、序列或数组进行更多修改后,主要问题是它们必须被延迟。事实证明,您可以使用

task
本身来做到这一点,使用
for

但是,您必须注意不要只使用

yield
,而是将
let!
yield
结合使用。这将确保按顺序等待任务:

let runMultipleTasks() = task {
    // should give this as argument, as must be closed after all tasks completed
    let file = File.Open("output1.txt", FileMode.Create)
    let stream = new StreamWriter(file)

    // the actual creation of tasks 
    let tasks = createBunchOfTasks stream |> Array.ofList
    let len = Array.length tasks
    let results = Array.zeroCreate len

    for i in 0..len - 1 do
        let! result = tasks[i]()  // ensure await-on-next
        results[i] <- result

    return List.ofArray results
}

注意:库

IcedTasks
有一个
ColdTask
类型,它允许以更简单的方式执行上述操作,就好像它是正常任务一样,并确保冷启动。


1
投票

你可以做的一件事就是利用惰性来防止任务开始得太早。例如:

let createTask n =
    task {
        printfn $"Task {n} started"
        Thread.Sleep(2000)
        printfn $"Task {n} finished"
        return 100 + n
    }

let lazyTasks =
    seq {
        for n = 1 to 10 do
            lazy createTask n
    }

所以在你的情况下,你会写:

for cardTransactionWithOrder in cardTransactionswithOrder do
    let lazyMatchedTransaction = lazy matchTransaction(page,bnzBatch,cardTransactionWithOrder, xeroClient)
    yield lazyMatchedTransaction

然后,您可以按顺序评估任何惰性任务序列,如下所示:

let evalTasksSequential lazyTasks =
    task {
        return seq {
            for (lazyTask : Lazy<Task<_>>) in lazyTasks do
                yield lazyTask.Value.Result   // start task and wait for it to finish
        }
    }

测试示例:

task {
    let! results = evalTasksSequential lazyTasks
    for result in results do
        printfn $"{result}"
} |> ignore

输出:

Task 1 started
Task 1 finished
101
Task 2 started
Task 2 finished
102
Task 3 started
Task 3 finished
103
Task 4 started
Task 4 finished
104
Task 5 started
Task 5 finished
105
Task 6 started
Task 6 finished
106
Task 7 started
Task 7 finished
107
Task 8 started
Task 8 finished
108
Task 9 started
Task 9 finished
109
Task 10 started
Task 10 finished
110

0
投票
cardTransactionswithOrders
|> List.map (fun cardTransactionWithOrder ->
  matchTransaction(page,bnzBatch,cardTransactionWithOrder, xeroClient) 
  |> Async.AwaitTask
  |> Async.RunSynchronously)
© www.soinside.com 2019 - 2024. All rights reserved.