Swift 5.6 如何将异步任务放入队列

问题描述 投票:0回答:3

假设我有这个代码

class Duck{
    
    func walk() async {
        //do something
        print("walk start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("walk end")
    }
    
    func quack() async {
        //do something...
        print("quack start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("quack end")
    }
    
    func fly() async{
        //do something
        print("fly start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("fly end")
    }
    
}

let duck = Duck()

Task{
    await duck.walk()
}

Task{
    await duck.quack()
}

Task{
    await duck.fly()
}

这将打印

walk start
quack start
fly start
walk end
quack end
fly end

这是我理解和期望的。但是,如果我希望这 3 个

Task
按顺序运行怎么办?假设每个
Task
都是由用户按下按钮创建的。我希望任务在后台排队并一个接一个地运行。有没有什么东西可以在
DispatchWorkItem
排队
DispatchQueue
,但是
Task
版本?


编辑:

我想出了一个解决方案,但我不确定这是否是实施它的好方法。由于此实现可能会创建多层级联

Task
,我想知道是否会有堆栈溢出或内存泄漏的风险?

class TaskQueue{
    private var currentTask : Task<Void,Never> = Task{}
    
    func dispatch(block:@escaping () async ->Void){
        
        let oldTask = currentTask
        currentTask = Task{
            _ = await oldTask.value
            await block()
        }
    }
}

taskQueue.dispatch {
    await duck.walk()
}
taskQueue.dispatch {
    await duck.quack()
}
taskQueue.dispatch {
    await duck.fly()
}
ios swift asynchronous async-await concurrency
3个回答
4
投票

更新:

对于未来发现这篇文章有用的人,我创建了一个 swift 包,实现更好,并增加了对排队的支持

AsyncThrowingStream

https://github.com/rickymohk/SwiftTaskQueue


这是我更新的实施,我认为它比我在问题中发布的实施更安全。

TaskQueueActor
部分完成所有工作,我用外部类包装它只是为了在从非异步上下文调用时使其更清晰。

class TaskQueue{
    
    private actor TaskQueueActor{
        private var blocks : [() async -> Void] = []
        private var currentTask : Task<Void,Never>? = nil
        
        func addBlock(block:@escaping () async -> Void){
            blocks.append(block)
            next()
        }
        
        func next()
        {
            if(currentTask != nil) {
                return
            }
            if(!blocks.isEmpty)
            {
                let block = blocks.removeFirst()
                currentTask = Task{
                    await block()
                    currentTask = nil
                    next()
                }
            }
        }
    }
    private let taskQueueActor = TaskQueueActor()
    
    func dispatch(block:@escaping () async ->Void){
        Task{
            await taskQueueActor.addBlock(block: block)
        }
    }
}

1
投票

我在 Github 上找到了这个:https://github.com/gshahbazian/playgrounds/blob/main/AsyncAwait.playground/Sources/TaskQueue.swift

通过

https://forums.swift.org/t/how-do-you-use-asyncstream-to-make-task-execution-deterministic/57968/18

import Foundation

public actor TaskQueue {
    private let concurrency: Int
    private var running: Int = 0
    private var queue = [CheckedContinuation<Void, Error>]()

    public init(concurrency: Int) {
        self.concurrency = concurrency
    }

    deinit {
        for continuation in queue {
            continuation.resume(throwing: CancellationError())
        }
    }

    public func enqueue<T>(operation: @escaping @Sendable () async throws -> T) async throws -> T {
        try Task.checkCancellation()

        try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
            queue.append(continuation)
            tryRunEnqueued()
        }

        defer {
            running -= 1
            tryRunEnqueued()
        }
        try Task.checkCancellation()
        return try await operation()
    }

    private func tryRunEnqueued() {
        guard !queue.isEmpty else { return }
        guard running < concurrency else { return }

        running += 1
        let continuation = queue.removeFirst()
        continuation.resume()
    }
}

似乎有效

@StateObject var taskQueue = TaskQueue(concurrency: 1)

            .task {
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 1")
                }
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 2")
                }
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 3")
                }

0
投票

我曾经是非结构化任务方法的支持者,在这种方法中,每个人都会

await
前一个。回想起来,这对我来说有点脆弱。我越来越多地(感谢 Rob Napier 在这个方向上推动我),我现在使用异步序列,特别是来自 Apple 的
AsyncChannel
swift-async-algorithms
。我认为这是一种更健壮的行为,更符合现代 Swift 并发的异步序列。

在我们开始您的示例之前,请考虑这个串行下载器,其中我们有一个进程(用户单击按钮)将

URL
对象发送到另一个进程,该进程监视
for
-
await
-
in
中的 URL 通道循环:

struct DownloadView: View {
    @StateObject var viewModel = DownloadViewModel()

    var body: some View {
        VStack {
            Button("1") { Task { await viewModel.appendDownload(1) } }
            Button("2") { Task { await viewModel.appendDownload(2) } }
            Button("3") { Task { await viewModel.appendDownload(3) } }
        }
        .task {
            await viewModel.monitorDownloadRequests()
        }
    }
}

@MainActor
class DownloadViewModel: ObservableObject {
    private let session: URLSession = …
    private let baseUrl: URL = …
    private let folder: URL = …
    private let channel = AsyncChannel<URL>()   // note, we're sending URLs on this channel

    func monitorDownloadRequests() async {
        for await url in channel {
            await download(url)
        }
    }

    func appendDownload(_ index: Int) async {
        let url = baseUrl.appending(component: "\(index).jpg")
        await channel.send(url)
    }

    func download(_ url: URL) async {
        do {
            let (location, _) = try await session.download(from: url)
            let fileUrl = folder.appending(component: url.lastPathComponent)
            try? FileManager.default.removeItem(at: fileUrl)
            try FileManager.default.moveItem(at: location, to: fileUrl)
        } catch {
            print(error)
        }
    }
}

我们开始

monitorDownloadRequests
然后
append
下载请求到频道。

这会串行执行请求(因为

monitorDownloadRequests
有一个
for
-
await
循环)。例如,在 Instruments 的“兴趣点”工具中,我在单击这些按钮的位置添加了一些Ⓢ路标,并显示了请求发生的间隔,您可以看到这三个请求是顺序发生的。

但是通道的美妙之处在于它们提供串行行为而不会引入 非结构化并发的问题。它们还自动处理取消(如果你想要那种行为)。如果您取消

for
-
await
-
in
循环(当视图被关闭时,
.task {…}
视图修饰符会在 SwiftUI 中自动为我们执行)。如果你有一堆非结构化并发,其中一个
Task
等待前一个,处理取消很快就会变得混乱。


现在,在您的情况下,您正在询问一个更通用的队列,您可以在其中等待任务。好吧,你可以有一个

AsyncChannel
的闭包:

typealias AsyncClosure = () async -> Void

let channel = AsyncChannel<AsyncClosure>()

例如:

typealias AsyncClosure = () async -> Void

struct ExperimentView: View {
    @StateObject var viewModel = ExperimentViewModel()

    var body: some View {
        VStack {
            Button("Red")   { Task { await viewModel.addRed() } }
            Button("Green") { Task { await viewModel.addGreen() } }
            Button("Blue")  { Task { await viewModel.addBlue() } }
        }
        .task {
            await viewModel.monitorChannel()
        }
    }
}

@MainActor
class ExperimentViewModel: ObservableObject {
    let channel = AsyncChannel<AsyncClosure>()

    func monitorChannel() async {
        for await task in channel {
            await task()
        }
    }

    func addRed() async {
        await channel.send { await self.red() }
    }

    func addGreen() async {
        await channel.send { await self.green() }
    }

    func addBlue() async {
        await channel.send { await self.blue() }
    }

    func red() async { … }

    func green() async { … }

    func blue() async { … }
}

产量:

在这里,我再次使用 Instruments 来可视化正在发生的事情。我快速地连续点击了“红色”、“绿色”和“蓝色”按钮两次。然后我观看了这三个秒任务的六个相应间隔。然后我第二次重复了这个六次点击过程,但这次我在他们完成之前关闭了有问题的视图,在第二个系列按钮点击的绿色任务中途,说明了

AsyncChannel
(和一般的异步序列)。

现在,我希望你能原谅我,因为我省略了创建所有这些“兴趣点”路标和间隔的代码,因为它增加了很多与手头问题无关的废话(但请参阅 this 如果你有兴趣)。但希望这些可视化有助于说明正在发生的事情。

关键信息是

AsyncChannel
(及其兄弟
AsyncThrowingChannel
)是一种保持结构化并发的好方法,但会得到串行(或受限行为,如本 answer 末尾所示)使用队列,但使用异步任务。

我必须承认,后一个

AsyncClosure
示例虽然有望回答您的问题,但在我看来有点勉强。我已经使用
AsyncChannel
几个月了,我个人总是有一个更具体的对象由频道处理(例如,URL、GPS 位置、图像标识符等)。这个带有闭包的例子感觉它试图重现老式的调度/操作队列行为有点太努力了。

© www.soinside.com 2019 - 2024. All rights reserved.