下面的代码曾经有效 - 但我正在将许多 async{} 函数转换为 task{},并且我不知道如何使其在没有可变变量的情况下按顺序运行。
let processBatchTransaction(page: IPage,
bnzBatch: BNZBatch,
cardTransactionswithOrder: BNZCardWithOrder list,
xeroClient: XeroClientFull) : Task<StatusMessage> = task {
let matchedTransactionsSeq
= seq{
for cardTransactionWithOrder in cardTransactionswithOrder do
let matchedTransaction = matchTransaction(page,bnzBatch,cardTransactionWithOrder, xeroClient)
yield matchedTransaction
}
let! matchedTransactions = matchedTransactionsSeq |> Async.Sequential
.... etc
}
“matchTransaction”函数曾经是异步的 - 但没有 Task.Sequential,只有 Task.WhenAll,但我需要它们一个接一个地同步运行。
编辑:请参阅下面的所有内容以获得更简单、更惯用的解决方案。线下的原始答案。
正如所承诺的(尽管布莱恩已经给出了一个很好的答案),这是我的。让我们首先了解一些规则:
task { ... }
创建的任务将始终是热启动的。这意味着它立即在当前线程(或后台线程,如果您使用backgroundTask
)上运行。Task<_>
类创建的任务将始终延迟启动。就像async
一样,你必须手动启动这些。task
CE 中绑定任何任务都会启动该任务。这意味着,给定带有
Task
的数组或序列,很可能在您获取它们时它们已经在运行。由于它们不是通过 bind
创建的,因此它们将异步运行。为了防止这种情况,我们需要一些规则。
编辑:如果您还没有检查过F#+,它可以开箱即用地执行以下操作。
task
组合任务、绑定任务或加入任务时,始终以
task
形式返回结果。这将简化您的整体流程,此外,您无法在不阻塞线程的情况下返回非任务(或非异步)。
Brian 建议通过
lazy
来完成此操作。这可以。您也可以简单地使用单位函数:fun() -> task { dosomething }
。
.Result
或 .Wait()
这些会阻塞你的线程。
List.traverseTaskResultA
FsToolkit.ErrorHandling.TaskResult
库的一部分,不会像 run X -> wait for result -> run Y -> wait for result
那样执行您的任务。相反,他们会run X -> run Y -> run Z -> asTask
。
换句话说,那些库函数会导致异步、重叠执行。
Thread.Sleep
几乎没有规则,但是当我测试你的场景时,我使用了
Thread.Sleep
。问题是,这个函数阻塞了当前线程。它会给人一种错误的印象,即您的函数在测试场景中按顺序运行,但在现实场景中它们不再按顺序运行。
请使用
Task.Delay
代替。它的工作原理相同,但不会阻塞线程。
有多种方法可以做到这一点。我将仅使用
ContinueWith
向您展示一个。步骤:
Unwrap
该延续。最后一个看起来有点奇怪,但 TPL 没有标准
bind
。 F# 中的 bind
虽然适用于此,但有点难以使用。 Ply
lib 提供了更简单的绑定,您也可以尝试一下。
无论如何,这是最核心的方法,仅使用标准库函数。好消息是,您只需编写此函数一次。
/// Join multiple delayed tasks and return the result of the last
let join tasks =
let wrapNext (t: unit -> Task<_>) (source: unit -> Task<_>): unit -> Task<_> =
fun () ->
source()
// this is the CORE of the whole operation
.ContinueWith((fun (_: Task) -> t ()), TaskContinuationOptions.OnlyOnRanToCompletion)
// extra step needed, as BCL has no direct way to unwrap nested tasks
.Unwrap() :?> Task<_>
let rec combine acc (tasks: (unit -> Task<_>) list) =
match tasks with
| [] -> acc
| t :: tail -> combine (wrapNext t acc) tail
match tasks with
| first :: rest -> combine first rest
| [] -> failwith "oh oh, no tasks given!"
以下是如何使用它。第一个函数看起来有点老套,但它只是为了模仿您的场景并能够仔细检查它是否按顺序运行。
/// Create 10 tasks, use stream for writing, otherwise would garble FSI
/// Note that those can be task or backgroundTask
let createBunchOfTasks(sw: StreamWriter) =
let mutable x = 0
let rnd () = Random().Next(10, 30)
let o = obj ()
let runTask i = backgroundTask {
let! _ = Task.Delay(rnd ()) // use randomization to ensure tasks last different times
x <- x + 1
// just some logging to a file, stdout is not good in this case
lock o (fun () -> sw.WriteLine(sprintf "Task #%i after delay: %i" i x))
return x
}
[
// creating bunch of dummy tasks
for i in 0..10 do
fun () -> runTask i
]
在您的场景中,您不需要上述代码,但您将需要类似的代码来调用
join
函数。如果您需要记录到文件,您可以使用此模式,但它实际上只是用于我的健全性检查,并且相当粗糙;)。
let runMultipleTasks() =
// should give this as argument, as must be closed after all tasks completed
let file = File.Open("output1.txt", FileMode.Create)
let stream = new StreamWriter(file)
// the actual creation of tasks
let tasks = createBunchOfTasks stream
let combinedTask = join tasks
// start the combined tasks
combinedTask()
在对任务列表、序列或数组进行更多修改后,主要问题是它们必须被延迟。事实证明,您可以使用
task
本身来做到这一点,使用 for
。
但是,您必须注意不要只使用
yield
,而是将let!
与yield
结合使用。这将确保按顺序等待任务:
let runMultipleTasks() = task {
// should give this as argument, as must be closed after all tasks completed
let file = File.Open("output1.txt", FileMode.Create)
let stream = new StreamWriter(file)
// the actual creation of tasks
let tasks = createBunchOfTasks stream |> Array.ofList
let len = Array.length tasks
let results = Array.zeroCreate len
for i in 0..len - 1 do
let! result = tasks[i]() // ensure await-on-next
results[i] <- result
return List.ofArray results
}
IcedTasks
有一个 ColdTask
类型,它允许以更简单的方式执行上述操作,就好像它是正常任务一样,并确保冷启动。
你可以做的一件事就是利用惰性来防止任务开始得太早。例如:
let createTask n =
task {
printfn $"Task {n} started"
Thread.Sleep(2000)
printfn $"Task {n} finished"
return 100 + n
}
let lazyTasks =
seq {
for n = 1 to 10 do
lazy createTask n
}
所以在你的情况下,你会写:
for cardTransactionWithOrder in cardTransactionswithOrder do
let lazyMatchedTransaction = lazy matchTransaction(page,bnzBatch,cardTransactionWithOrder, xeroClient)
yield lazyMatchedTransaction
然后,您可以按顺序评估任何惰性任务序列,如下所示:
let evalTasksSequential lazyTasks =
task {
return seq {
for (lazyTask : Lazy<Task<_>>) in lazyTasks do
yield lazyTask.Value.Result // start task and wait for it to finish
}
}
测试示例:
task {
let! results = evalTasksSequential lazyTasks
for result in results do
printfn $"{result}"
} |> ignore
输出:
Task 1 started
Task 1 finished
101
Task 2 started
Task 2 finished
102
Task 3 started
Task 3 finished
103
Task 4 started
Task 4 finished
104
Task 5 started
Task 5 finished
105
Task 6 started
Task 6 finished
106
Task 7 started
Task 7 finished
107
Task 8 started
Task 8 finished
108
Task 9 started
Task 9 finished
109
Task 10 started
Task 10 finished
110
cardTransactionswithOrders
|> List.map (fun cardTransactionWithOrder ->
matchTransaction(page,bnzBatch,cardTransactionWithOrder, xeroClient)
|> Async.AwaitTask
|> Async.RunSynchronously)