用于阻塞(CPU 密集)任务的异步函数?

问题描述 投票:0回答:1

我在 Swift 中有一个缓慢的阻塞函数,我想以非阻塞(异步/等待)方式调用它。

这是原始的阻止代码:

// Original
func totalSizeBytesBlocking() -> UInt64 {
    var total: UInt64 = 0
    for resource in slowBlockingFunctionToGetResources() {
        total += resource.fileSize
    }
    return total
}

这是我尝试使其成为非阻塞的:

// Attempt at async/await
func totalSizeBytesAsync() async -> UInt64 {
    // I believe I need Task.detached instead of just Task.init to avoid blocking the main thread?
    let handle = Task.detached {
        return self.slowBlockingFunctionToGetResources()
    }
    // is this the right way to handle cancellation propagation with a detached Task?
    // or should I be using Task.withTaskCancellationHandler?
    if Task.isCancelled {
        handle.cancel()
    }
    let resources = await handle.value
    var total: UInt64 = 0
    for resource in resources {
        total += resource.fileSize
    }
    return total
}

我的目标是能够等待来自主参与者/线程的异步版本,并让它在后台线程上执行缓慢的阻塞工作,同时我继续更新主线程上的 UI。

这应该怎么做?

swift asynchronous async-await swift-concurrency
1个回答
0
投票

这里有几个问题。

  1. 如何办理取消?

    您的怀疑是正确的,这个

    Task.isCancelled
    模式是不够的。问题在于,您在创建任务后立即进行测试,但它会很快通过该行代码并启动任务的
    await
    。正如您所猜测的,在处理非结构化并发时,您需要
    withTaskCancellationHandler
    :

    func totalSizeBytesAsync() async throws -> UInt64 {
        let task = Task.detached {
            try self.slowBlockingFunctionToGetResources()
        }
        let resources = try await withTaskCancellationHandler {
            try await task.value
        } onCancel: {
            task.cancel()
        }
    
        return resources.reduce(0) { $0 + $1.fileSize }
    }
    
    func slowBlockingFunctionToGetResources() throws -> [Resource] {
        var resources: [Resource] = []
        while !isDone {
            try Task.checkCancellation()
            resources.append(…)
        }
        return resources
    }
    

    可能不用说,如上所示,只有在慢速阻塞功能支持的情况下,取消才能正确工作(例如,定期尝试

    checkCancellation
    ,或者不太理想的测试
    isCancelled
    )。

    如果这个缓慢的同步函数不处理取消,那么检查取消就没有什么意义,并且您将陷入一个在同步任务完成之前不会完成的任务。但至少

    totalSizeBytesAsync
    不会阻塞。例如:

    func totalSizeBytesAsync() async -> UInt64 {
        let resources = await Task.detached {
            self.slowBlockingFunctionToGetResources()
        }.value
    
        return resources.reduce(0) { $0 + $1.fileSize }
    }
    
    func slowBlockingFunctionToGetResources() -> [Resource] {…}
    
  2. 您是否应该使用非结构化并发?

    作为一般规则,我们应该避免因非结构化并发而使代码混乱(我们需要承担手动检查取消的负担)。因此,这就引出了一个问题:如何从当前演员手中接过任务。您可以将同步函数放入其自己的单独参与者中。或者,由于 SE-0338,您可以选择让您的慢速函数

    nonisolated
    async
    ,这样就可以从当前演员那里得到它:

    func totalSizeBytesAsync() async throws -> UInt64 {
        try await slowBlockingFunctionToGetResources()
            .reduce(0) { $0 + $1.fileSize }
    }
    
    nonisolated func slowBlockingFunctionToGetResources() async throws -> [Resource] {
        var resources: [Resource] = []
        while !isDone {
            try Task.checkCancellation()            
            resources.append(…)
        }
        return resources
    }
    

    但是通过保持结构化并发,我们的代码大大简化了。

    显然,如果您想使用

    actor
    ,请随意:

    let resourceManager = ResourceManager()
    
    func totalSizeBytesAsync() async throws -> UInt64 {
        try await resourceManager.slowBlockingFunctionToGetResources()
            .reduce(0) { $0 + $1.fileSize }
    }
    

    地点:

    actor ResourceManager {
        …
    
        func slowBlockingFunctionToGetResources() throws -> [Resource] {
            var resources: [Resource] = []
            while !isDone {
                try Task.checkCancellation()
                resources.append(…)
            }
            return resources
        }
    }
    
  3. 同步功能有多慢?

    Swift 并发依赖于“契约”来避免阻塞协作线程池中的任何线程。请参阅https://stackoverflow.com/a/74580345/1271826

    因此,如果这确实是一个慢速函数,我们确实应该定期将慢速进程

    Task.yield()
    发送到 Swift 并发系统,以避免潜在的死锁。例如,

    func totalSizeBytesAsync() async throws -> UInt64 {
        try await slowNonBlockingFunctionToGetResources()
            .reduce(0) { $0 + $1.fileSize }
    }
    
    nonisolated func slowNonBlockingFunctionToGetResources() async throws -> [Resource] {
        var resources: [Resource] = []
        while !isDone {
            await Task.yield()
            try Task.checkCancellation()
            resources.append(…)
        }
        return resources
    }
    

    现在,如果 (a) 您没有机会重构此函数来定期生成; (b) 它确实非常非常慢,那么 Apple 建议您从 Swift 并发系统中获取此功能。例如,在 WWDC 2022 视频可视化和优化 Swift 并发中,他们建议 GCD:

    如果您有需要执行这些操作的代码 [无法定期向 Swift 并发系统屈服的缓慢同步函数],请将该代码移出并发线程池 - 例如,通过在调度队列上运行它 - 并桥接使用延续将其引入并发世界。只要有可能,请使用异步 API 进行阻塞操作,以保持系统平稳运行。

    底线,要小心不要长时间阻塞协作线程池线程。

© www.soinside.com 2019 - 2024. All rights reserved.