我试图了解任务到底如何与循环一起工作。
我正在尝试将数字输出到控制台,并注意到我得到的结果不是我预期的。
actor ThreadSafeCollection<T> {
private var collection: [T] = []
func add(_ element: T) {
collection.append(element)
}
func getAll() -> [T] {
return collection
}
func remove(at index: Int) {
guard collection.indices.contains(index) else { return }
collection.remove(at: index)
}
}
var safeCollection: ThreadSafeCollection<Int> = ThreadSafeCollection()
@Sendable func firstIterate() async -> Task<[Int], Never> {
Task {
for i in 0..<500 {
await safeCollection.add(i)
}
return await safeCollection.getAll()
}
}
@Sendable func secondIterate() async -> Task<[Int], Never> {
Task {
for i in 0..<500 {
await safeCollection.add(i)
}
return await safeCollection.getAll()
}
}
Task {
let result = await withTaskGroup(of: [Int].self, returning: [Int].self) { taskGroup in
taskGroup.addTask { await firstIterate().value }
taskGroup.addTask { await secondIterate().value }
var collected = [Int]()
for await value in taskGroup {
collected.append(contentsOf: value)
}
return collected
}
print(result.sorted(by: <))
}
在这个例子中,我通过调用第一个 Iterate() 和 secondaryIterate() 方法迭代 2 次到 500,结果我希望得到一个包含数字的数组,其中每个数字将重复 2 次。但相反,我在控制台中看到每个数字 4 次。
[0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4...
此外,我注意到在数组的末尾,数字重复的次数不是开头的 4 次,而是 3 次。
...494, 494, 494, 495, 495, 495, 496, 496, 496, 497, 497, 497, 498, 498, 498, 499, 499, 499]
谁能解释为什么会发生这种情况?
Paulw11 解释了为什么您通常会看到四个值(因为您将结果附加到您的
ThreadSafeCollection
,但每个任务返回整个当时当前的集合(即返回整个组合集合两次) Cy-4AH 解释了为什么有时会看到 3 次出现 499,因为在附加值和返回整个集合的(重复)副本之间存在竞争。
有两种解决方案:
鉴于您正在收集
ThreadSafeCollection
中的结果,请勿返回每个任务组子任务的整个集合。只需返回最后的getAll
的ThreadSafeCollection
即可:
final class ThreadSafeCollectionExample: Sendable {
var safeCollection: ThreadSafeCollection<Int> = []
func firstIterate() async {
for i in 0..<500 {
await safeCollection.append(i)
}
}
func secondIterate() async {
for i in 0..<500 {
await safeCollection.append(i)
}
}
}
Task {
let experiment = ThreadSafeCollectionExample()
let result = await withTaskGroup(of: Void.self) { group in // or `withDiscardingTaskGroup` in more current OS versions
group.addTask { await experiment.firstIterate() }
group.addTask { await experiment.secondIterate() }
await group.waitForAll()
return await experiment.safeCollection.getAll()
}
print(result.sorted(by: <))
}
这将返回
safeCollection
以及值 0..<500, as expected. But this introduces a lot of individual synchronizations, which can impact performance. (See point 2, below, if you want to see how to avoid that.) 的两个副本
不相关,但如果你原谅我,我调整了
ThreadSafeCollection
以使用更传统的命名约定(例如,append
与add
):
actor ThreadSafeCollection<T>: ExpressibleByArrayLiteral {
private var collection: [T]
init(collection: [T] = []) {
self.collection = collection
}
convenience init(arrayLiteral: T...) {
self.init(collection: arrayLiteral)
}
func append(_ element: T) {
collection.append(element)
}
func append(contentsOf elements: [T]) {
collection.append(contentsOf: elements)
}
func getAll() -> [T] {
return collection
}
func remove(at index: Int) {
guard collection.indices.contains(index) else { return }
collection.remove(at: index)
}
}
完全退休
ThreadSafeCollection
,并让例程返回局部变量:
final class UsingLocalCollections: Sendable {
func firstIterate() async -> [Int] {
var values: [Int] = []
for i in 0..<500 {
await values.append(i)
}
return values
}
func secondIterate() async -> [Int] {
var values: [Int] = []
for i in 0..<500 {
await values.append(i)
}
return values
}
}
Task {
let experiment = UsingLocalCollections()
let result = await withTaskGroup(of: [Int].self) { group in // or `withDiscardingTaskGroup` in more current OS versions
group.addTask { await experiment.firstIterate() }
group.addTask { await experiment.secondIterate() }
return await group.reduce(into: []) { partialResult, newValues in
partialResult.append(contentsOf: newValues)
}
}
print(result.sorted(by: <))
}
此方法的优点是不需要同步集合的 1,000 个更新中的每一个(这将破坏您通过同时运行这些更新所希望获得的任何性能优势)。
顺便说一句,回到你原来的例子,我可能建议避免不必要的非结构化并发,即使用
Task {…}
。
所以,而不是:
func firstIterate() -> Task<[Int], Never> {
Task {
for i in 0..<500 {
…
}
return await safeCollection.getAll()}
}
}
您可以保持结构化并发并将其简化为:
func firstIterate() async -> [Int] {
for i in 0..<500 {
…
}
return await safeCollection.getAll()}
}
然后,代替:
taskGroup.addTask { await secondIterate().value }
你只会:
taskGroup.addTask { await secondIterate() }
它不仅更干净,而且通过保持结构化并发,您可以享受任务取消等的自动传播。非结构化并发为您提供了更细粒度的控制,但给您带来了代码正确性的负担。因此,在真正需要的地方使用非结构化并发(这里不是这种情况),但在可能的情况下支持结构化并发。更简单。 (如果您确实需要如何正确使用非结构化并发的示例,请告诉我。)