我在 Swift 中有一个缓慢的阻塞函数,我想以非阻塞(异步/等待)方式调用它。
这是原始的阻止代码:
// Original
func totalSizeBytesBlocking() -> UInt64 {
var total: UInt64 = 0
for resource in slowBlockingFunctionToGetResources() {
total += resource.fileSize
}
return total
}
这是我尝试使其成为非阻塞的:
// Attempt at async/await
func totalSizeBytesAsync() async -> UInt64 {
// I believe I need Task.detached instead of just Task.init to avoid blocking the main thread?
let handle = Task.detached {
return self.slowBlockingFunctionToGetResources()
}
// is this the right way to handle cancellation propagation with a detached Task?
// or should I be using Task.withTaskCancellationHandler?
if Task.isCancelled {
handle.cancel()
}
let resources = await handle.value
var total: UInt64 = 0
for resource in resources {
total += resource.fileSize
}
return total
}
我的目标是能够等待来自主参与者/线程的异步版本,并让它在后台线程上执行缓慢的阻塞工作,同时我继续更新主线程上的 UI。
这应该怎么做?
这里有几个问题。
如何办理取消?
您的怀疑是正确的,这个
Task.isCancelled
模式是不够的。问题在于,您在创建任务后立即进行测试,但它会很快通过该行代码并启动任务的await
。正如您所猜测的,在处理非结构化并发时,您需要withTaskCancellationHandler
:
func totalSizeBytesAsync() async throws -> UInt64 {
let task = Task.detached {
try self.slowBlockingFunctionToGetResources()
}
let resources = try await withTaskCancellationHandler {
try await task.value
} onCancel: {
task.cancel()
}
return resources.reduce(0) { $0 + $1.fileSize }
}
func slowBlockingFunctionToGetResources() throws -> [Resource] {
var resources: [Resource] = []
while !isDone {
try Task.checkCancellation()
resources.append(…)
}
return resources
}
可能不用说,如上所示,只有在慢速阻塞功能支持的情况下,取消才能正确工作(例如,定期尝试
checkCancellation
,或者不太理想的测试isCancelled
)。
如果这个缓慢的同步函数不处理取消,那么检查取消就没有什么意义,并且您将陷入一个在同步任务完成之前不会完成的任务。但至少
totalSizeBytesAsync
不会阻塞。例如:
func totalSizeBytesAsync() async -> UInt64 {
let resources = await Task.detached {
self.slowBlockingFunctionToGetResources()
}.value
return resources.reduce(0) { $0 + $1.fileSize }
}
func slowBlockingFunctionToGetResources() -> [Resource] {…}
您是否应该使用非结构化并发?
作为一般规则,我们应该避免因非结构化并发而使代码混乱(我们需要承担手动检查取消的负担)。因此,这就引出了一个问题:如何从当前演员手中接过任务。您可以将同步函数放入其自己的单独参与者中。或者,由于 SE-0338,您可以选择让您的慢速函数
nonisolated
async
,这样就可以从当前演员那里得到它:
func totalSizeBytesAsync() async throws -> UInt64 {
try await slowBlockingFunctionToGetResources()
.reduce(0) { $0 + $1.fileSize }
}
nonisolated func slowBlockingFunctionToGetResources() async throws -> [Resource] {
var resources: [Resource] = []
while !isDone {
try Task.checkCancellation()
resources.append(…)
}
return resources
}
但是通过保持结构化并发,我们的代码大大简化了。
显然,如果您想使用
actor
,请随意:
let resourceManager = ResourceManager()
func totalSizeBytesAsync() async throws -> UInt64 {
try await resourceManager.slowBlockingFunctionToGetResources()
.reduce(0) { $0 + $1.fileSize }
}
地点:
actor ResourceManager {
…
func slowBlockingFunctionToGetResources() throws -> [Resource] {
var resources: [Resource] = []
while !isDone {
try Task.checkCancellation()
resources.append(…)
}
return resources
}
}
同步功能有多慢?
Swift 并发依赖于“契约”来避免阻塞协作线程池中的任何线程。请参阅https://stackoverflow.com/a/74580345/1271826。
因此,如果这确实是一个慢速函数,我们确实应该定期将慢速进程
Task.yield()
发送到 Swift 并发系统,以避免潜在的死锁。例如,
func totalSizeBytesAsync() async throws -> UInt64 {
try await slowNonBlockingFunctionToGetResources()
.reduce(0) { $0 + $1.fileSize }
}
nonisolated func slowNonBlockingFunctionToGetResources() async throws -> [Resource] {
var resources: [Resource] = []
while !isDone {
await Task.yield()
try Task.checkCancellation()
resources.append(…)
}
return resources
}
现在,如果 (a) 您没有机会重构此函数来定期生成; (b) 它确实非常非常慢,那么 Apple 建议您从 Swift 并发系统中获取此功能。例如,在 WWDC 2022 视频可视化和优化 Swift 并发中,他们建议 GCD:
如果您有需要执行这些操作的代码 [无法定期向 Swift 并发系统屈服的缓慢同步函数],请将该代码移出并发线程池 - 例如,通过在调度队列上运行它 - 并桥接使用延续将其引入并发世界。只要有可能,请使用异步 API 进行阻塞操作,以保持系统平稳运行。
底线,要小心不要长时间阻塞协作线程池线程。