假设我有这个代码
class Duck{
func walk() async {
//do something
print("walk start")
try? await Task.sleep(nanoseconds: UInt64(2e9))
print("walk end")
}
func quack() async {
//do something...
print("quack start")
try? await Task.sleep(nanoseconds: UInt64(2e9))
print("quack end")
}
func fly() async{
//do something
print("fly start")
try? await Task.sleep(nanoseconds: UInt64(2e9))
print("fly end")
}
}
let duck = Duck()
Task{
await duck.walk()
}
Task{
await duck.quack()
}
Task{
await duck.fly()
}
这将打印
walk start
quack start
fly start
walk end
quack end
fly end
这是我理解和期望的。但是,如果我希望这 3 个
Task
按顺序运行怎么办?假设每个Task
都是由用户按下按钮创建的。我希望任务在后台排队并一个接一个地运行。有没有什么东西可以在DispatchWorkItem
排队DispatchQueue
,但是Task
版本?
我想出了一个解决方案,但我不确定这是否是实施它的好方法。由于此实现可能会创建多层级联
Task
,我想知道是否会有堆栈溢出或内存泄漏的风险?
class TaskQueue{
private var currentTask : Task<Void,Never> = Task{}
func dispatch(block:@escaping () async ->Void){
let oldTask = currentTask
currentTask = Task{
_ = await oldTask.value
await block()
}
}
}
taskQueue.dispatch {
await duck.walk()
}
taskQueue.dispatch {
await duck.quack()
}
taskQueue.dispatch {
await duck.fly()
}
对于未来发现这篇文章有用的人,我创建了一个 swift 包,实现更好,并增加了对排队的支持
AsyncThrowingStream
。
https://github.com/rickymohk/SwiftTaskQueue
这是我更新的实施,我认为它比我在问题中发布的实施更安全。
TaskQueueActor
部分完成所有工作,我用外部类包装它只是为了在从非异步上下文调用时使其更清晰。
class TaskQueue{
private actor TaskQueueActor{
private var blocks : [() async -> Void] = []
private var currentTask : Task<Void,Never>? = nil
func addBlock(block:@escaping () async -> Void){
blocks.append(block)
next()
}
func next()
{
if(currentTask != nil) {
return
}
if(!blocks.isEmpty)
{
let block = blocks.removeFirst()
currentTask = Task{
await block()
currentTask = nil
next()
}
}
}
}
private let taskQueueActor = TaskQueueActor()
func dispatch(block:@escaping () async ->Void){
Task{
await taskQueueActor.addBlock(block: block)
}
}
}
我在 Github 上找到了这个:https://github.com/gshahbazian/playgrounds/blob/main/AsyncAwait.playground/Sources/TaskQueue.swift
通过
https://forums.swift.org/t/how-do-you-use-asyncstream-to-make-task-execution-deterministic/57968/18
import Foundation
public actor TaskQueue {
private let concurrency: Int
private var running: Int = 0
private var queue = [CheckedContinuation<Void, Error>]()
public init(concurrency: Int) {
self.concurrency = concurrency
}
deinit {
for continuation in queue {
continuation.resume(throwing: CancellationError())
}
}
public func enqueue<T>(operation: @escaping @Sendable () async throws -> T) async throws -> T {
try Task.checkCancellation()
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
queue.append(continuation)
tryRunEnqueued()
}
defer {
running -= 1
tryRunEnqueued()
}
try Task.checkCancellation()
return try await operation()
}
private func tryRunEnqueued() {
guard !queue.isEmpty else { return }
guard running < concurrency else { return }
running += 1
let continuation = queue.removeFirst()
continuation.resume()
}
}
似乎有效
@StateObject var taskQueue = TaskQueue(concurrency: 1)
.task {
try? await taskQueue.enqueue {
//Task{
try? await Task.sleep(for: .seconds(1))
print("Done 1")
}
try? await taskQueue.enqueue {
//Task{
try? await Task.sleep(for: .seconds(1))
print("Done 2")
}
try? await taskQueue.enqueue {
//Task{
try? await Task.sleep(for: .seconds(1))
print("Done 3")
}
我曾经是非结构化任务方法的支持者,在这种方法中,每个人都会
await
前一个。回想起来,这对我来说有点脆弱。我越来越多地(感谢 Rob Napier 在这个方向上推动我),我现在使用异步序列,特别是来自 Apple 的 AsyncChannel
的 swift-async-algorithms
。我认为这是一种更健壮的行为,更符合现代 Swift 并发的异步序列。
在我们开始您的示例之前,请考虑这个串行下载器,其中我们有一个进程(用户单击按钮)将
URL
对象发送到另一个进程,该进程监视 for
-await
-in
中的 URL 通道循环:
struct DownloadView: View {
@StateObject var viewModel = DownloadViewModel()
var body: some View {
VStack {
Button("1") { Task { await viewModel.appendDownload(1) } }
Button("2") { Task { await viewModel.appendDownload(2) } }
Button("3") { Task { await viewModel.appendDownload(3) } }
}
.task {
await viewModel.monitorDownloadRequests()
}
}
}
@MainActor
class DownloadViewModel: ObservableObject {
private let session: URLSession = …
private let baseUrl: URL = …
private let folder: URL = …
private let channel = AsyncChannel<URL>() // note, we're sending URLs on this channel
func monitorDownloadRequests() async {
for await url in channel {
await download(url)
}
}
func appendDownload(_ index: Int) async {
let url = baseUrl.appending(component: "\(index).jpg")
await channel.send(url)
}
func download(_ url: URL) async {
do {
let (location, _) = try await session.download(from: url)
let fileUrl = folder.appending(component: url.lastPathComponent)
try? FileManager.default.removeItem(at: fileUrl)
try FileManager.default.moveItem(at: location, to: fileUrl)
} catch {
print(error)
}
}
}
我们开始
monitorDownloadRequests
然后append
下载请求到频道。
这会串行执行请求(因为
monitorDownloadRequests
有一个for
-await
循环)。例如,在 Instruments 的“兴趣点”工具中,我在单击这些按钮的位置添加了一些Ⓢ路标,并显示了请求发生的间隔,您可以看到这三个请求是顺序发生的。
但是通道的美妙之处在于它们提供串行行为而不会引入 非结构化并发的问题。它们还自动处理取消(如果你想要那种行为)。如果您取消
for
-await
-in
循环(当视图被关闭时,.task {…}
视图修饰符会在 SwiftUI 中自动为我们执行)。如果你有一堆非结构化并发,其中一个 Task
等待前一个,处理取消很快就会变得混乱。
现在,在您的情况下,您正在询问一个更通用的队列,您可以在其中等待任务。好吧,你可以有一个
AsyncChannel
的闭包:
typealias AsyncClosure = () async -> Void
let channel = AsyncChannel<AsyncClosure>()
例如:
typealias AsyncClosure = () async -> Void
struct ExperimentView: View {
@StateObject var viewModel = ExperimentViewModel()
var body: some View {
VStack {
Button("Red") { Task { await viewModel.addRed() } }
Button("Green") { Task { await viewModel.addGreen() } }
Button("Blue") { Task { await viewModel.addBlue() } }
}
.task {
await viewModel.monitorChannel()
}
}
}
@MainActor
class ExperimentViewModel: ObservableObject {
let channel = AsyncChannel<AsyncClosure>()
func monitorChannel() async {
for await task in channel {
await task()
}
}
func addRed() async {
await channel.send { await self.red() }
}
func addGreen() async {
await channel.send { await self.green() }
}
func addBlue() async {
await channel.send { await self.blue() }
}
func red() async { … }
func green() async { … }
func blue() async { … }
}
产量:
在这里,我再次使用 Instruments 来可视化正在发生的事情。我快速地连续点击了“红色”、“绿色”和“蓝色”按钮两次。然后我观看了这三个秒任务的六个相应间隔。然后我第二次重复了这个六次点击过程,但这次我在他们完成之前关闭了有问题的视图,在第二个系列按钮点击的绿色任务中途,说明了
AsyncChannel
(和一般的异步序列)。
现在,我希望你能原谅我,因为我省略了创建所有这些“兴趣点”路标和间隔的代码,因为它增加了很多与手头问题无关的废话(但请参阅 this 如果你有兴趣)。但希望这些可视化有助于说明正在发生的事情。
关键信息是
AsyncChannel
(及其兄弟 AsyncThrowingChannel
)是一种保持结构化并发的好方法,但会得到串行(或受限行为,如本 answer 末尾所示)使用队列,但使用异步任务。
我必须承认,后一个
AsyncClosure
示例虽然有望回答您的问题,但在我看来有点勉强。我已经使用 AsyncChannel
几个月了,我个人总是有一个更具体的对象由频道处理(例如,URL、GPS 位置、图像标识符等)。这个带有闭包的例子感觉它试图重现老式的调度/操作队列行为有点太努力了。