任务中的 Swift 异步循环

问题描述 投票:0回答:1

我试图了解任务到底如何与循环一起工作。

我正在尝试将数字输出到控制台,并注意到我得到的结果不是我预期的。

actor ThreadSafeCollection<T> {
    private var collection: [T] = []

    func add(_ element: T) {
        collection.append(element)
    }

    func getAll() -> [T] {
        return collection
    }

    func remove(at index: Int) {
        guard collection.indices.contains(index) else { return }
        collection.remove(at: index)
    }
}

var safeCollection: ThreadSafeCollection<Int> = ThreadSafeCollection()

@Sendable func firstIterate() async -> Task<[Int], Never> {
    Task {
        for i in 0..<500 {
            await safeCollection.add(i)
        }
        return await safeCollection.getAll()
    }
}

@Sendable func secondIterate() async -> Task<[Int], Never> {
    Task {
        for i in 0..<500 {
            await safeCollection.add(i)
        }
        return await safeCollection.getAll()
    }
}

Task {
    let result = await withTaskGroup(of: [Int].self, returning: [Int].self) { taskGroup in
        taskGroup.addTask { await firstIterate().value }
        taskGroup.addTask { await secondIterate().value }
        
        var collected = [Int]()
        
        for await value in taskGroup {
            collected.append(contentsOf: value)
        }

        return collected
    }
    print(result.sorted(by: <))
}


在这个例子中,我通过调用第一个 Iterate() 和 secondaryIterate() 方法迭代 2 次到 500,结果我希望得到一个包含数字的数组,其中每个数字将重复 2 次。但相反,我在控制台中看到每个数字 4 次。

[0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4...

此外,我注意到在数组的末尾,数字重复的次数不是开头的 4 次,而是 3 次。

...494, 494, 494, 495, 495, 495, 496, 496, 496, 497, 497, 497, 498, 498, 498, 499, 499, 499]

谁能解释为什么会发生这种情况?

ios arrays swift multithreading async-await
1个回答
0
投票

Paulw11 解释了为什么您通常会看到四个值(因为您将结果附加到您的

ThreadSafeCollection
,但每个任务返回整个当时当前的集合(即返回整个组合集合两次) Cy-4AH 解释了为什么有时会看到 3 次出现 499,因为在附加值和返回整个集合的(重复)副本之间存在竞争。

有两种解决方案:

  1. 鉴于您正在收集

    ThreadSafeCollection
    中的结果,请勿返回每个任务组子任务的整个集合。只需返回最后的
    getAll
    ThreadSafeCollection
    即可:

    final class ThreadSafeCollectionExample: Sendable {
        var safeCollection: ThreadSafeCollection<Int> = []
    
        func firstIterate() async {
            for i in 0..<500 {
                await safeCollection.append(i)
            }
        }
    
        func secondIterate() async {
            for i in 0..<500 {
                await safeCollection.append(i)
            }
        }
    }
    
    Task {
        let experiment = ThreadSafeCollectionExample()
    
        let result = await withTaskGroup(of: Void.self) { group in // or `withDiscardingTaskGroup` in more current OS versions
            group.addTask { await experiment.firstIterate() }
            group.addTask { await experiment.secondIterate() }
    
            await group.waitForAll()
    
            return await experiment.safeCollection.getAll()
        }
        print(result.sorted(by: <))
    }
    

    这将返回

    safeCollection
    以及值 0..<500, as expected. But this introduces a lot of individual synchronizations, which can impact performance. (See point 2, below, if you want to see how to avoid that.)

    的两个副本

    不相关,但如果你原谅我,我调整了

    ThreadSafeCollection
    以使用更传统的命名约定(例如,
    append
    add
    ):

    actor ThreadSafeCollection<T>: ExpressibleByArrayLiteral {
        private var collection: [T]
    
        init(collection: [T] = []) {
            self.collection = collection
        }
    
        convenience init(arrayLiteral: T...) {
            self.init(collection: arrayLiteral)
        }
    
        func append(_ element: T) {
            collection.append(element)
        }
    
        func append(contentsOf elements: [T]) {
            collection.append(contentsOf: elements)
        }
    
        func getAll() -> [T] {
            return collection
        }
    
        func remove(at index: Int) {
            guard collection.indices.contains(index) else { return }
            collection.remove(at: index)
        }
    }
    
  2. 完全退休

    ThreadSafeCollection
    ,并让例程返回局部变量:

    final class UsingLocalCollections: Sendable {
        func firstIterate() async -> [Int] {
            var values: [Int] = []
            for i in 0..<500 {
                await values.append(i)
            }
            return values
        }
    
        func secondIterate() async -> [Int] {
            var values: [Int] = []
            for i in 0..<500 {
                await values.append(i)
            }
            return values
        }
    }
    
    Task {
        let experiment = UsingLocalCollections()
    
        let result = await withTaskGroup(of: [Int].self) { group in // or `withDiscardingTaskGroup` in more current OS versions
            group.addTask { await experiment.firstIterate() }
            group.addTask { await experiment.secondIterate() }
    
            return await group.reduce(into: []) { partialResult, newValues in
                partialResult.append(contentsOf: newValues)
            }
        }
        print(result.sorted(by: <))
    }
    

    此方法的优点是不需要同步集合的 1,000 个更新中的每一个(这将破坏您通过同时运行这些更新所希望获得的任何性能优势)。


顺便说一句,回到你原来的例子,我可能建议避免不必要的非结构化并发,即使用

Task {…}

所以,而不是:

func firstIterate() -> Task<[Int], Never> {
    Task {
        for i in 0..<500 {
            …
        }
        return await safeCollection.getAll()}
    }
}

您可以保持结构化并发并将其简化为:

func firstIterate() async -> [Int] {
    for i in 0..<500 {
        …
    }
    return await safeCollection.getAll()}
}

然后,代替:

taskGroup.addTask { await secondIterate().value }

你只会:

taskGroup.addTask { await secondIterate() }

它不仅更干净,而且通过保持结构化并发,您可以享受任务取消等的自动传播。非结构化并发为您提供了更细粒度的控制,但给您带来了代码正确性的负担。因此,在真正需要的地方使用非结构化并发(这里不是这种情况),但在可能的情况下支持结构化并发。更简单。 (如果您确实需要如何正确使用非结构化并发的示例,请告诉我。)

© www.soinside.com 2019 - 2024. All rights reserved.