Swift 异步序列缺少 @Published 属性的更新

问题描述 投票:0回答:1

我在应用程序中遇到测试偶尔会失败的问题。我已经设法将问题范围缩小到这段代码片段。该代码片段包含两个异步序列,它们都观察并修改相同的

@Published
属性
fire

我的期望是异步序列将接收来自

@Published
属性的所有更新,确保测试始终成功。然而,经过一定次数的测试运行后(大约 < 100,000), the test hangs and never completes.

起初,我怀疑存在数据争用问题,但即使使用

@MainActor
注释该类以确保整个类在同一线程上运行,问题仍然存在。

成功测试后,控制台会打印:

createStream1(): true
createStream2(): true
createStream1(): false

如果测试不成功,控制台会打印:

createStream1(): true
createStream2(): true
createStream1(): true

代码片段:

final class Tests: XCTestCase {
    @MainActor func test_SUTStream1() async {
        await SUT().toTest()
    }
}

@MainActor
class SUT {
    @Published private(set) var fire = false

    init() {}

    func toTest() async {
        let cancelable = Task {
            await createStream2()
        }
        await createStream1()
        cancelable.cancel()
    }

    private func createStream1() async {
        fire = true

        for await didFire in $fire.values {
            print("\(#function): \(didFire)")
            if didFire == false {
                break
            }
        }
    }

    private func createStream2() async {
        for await didFire in $fire.values {
            print("\(#function): \(didFire)")
            if didFire == true {
                fire = false
                break
            }
        }
    }
}
  • 这个问题的原因是什么?为什么异步序列有时会错过
    @Published
    属性的更新?
  • 这个问题如何解决?
swift concurrency task combine property-wrapper-published
1个回答
1
投票

问题是

values
在存在背压的情况下不会缓冲其值。因此,如果在现有值被消耗之前发布后续值,则前一个值将被删除。

考虑以下内容,其中我添加了延迟以一致地体现该行为:

@MainActor
class Foo {
    @Published private(set) var value = 0

    init() {}

    func experiment() async throws {
        let publishTask = Task {
            for i in 0 ..< 10 {
                value = i
                try await Task.sleep(for: .milliseconds(300))
            }
        }
        
        let consumeTask = Task {
            for await value in $value.values {
                print(value)
                if value == 9 { break }
                try await Task.sleep(for: .seconds(2))
            }
        }
        
        try await withTaskCancellationHandler {
            try await publishTask.value
            try await consumeTask.value
        } onCancel: { 
            publishTask.cancel()
            consumeTask.cancel()
        }
        
        print("done")
    }
}

调用

experiment
,会产生:

0
6
9

如果您不希望它删除值,您可以在 buffer

 上添加 
consumeTask

let consumeTask = Task {
    let sequence = $value
        .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
        .values

    for await value in sequence {
        print(value)
        if value == 9 { break }
        try await Task.sleep(for: .seconds(2))
    }
}

这将捕获所有值(当然,直到缓冲区大小)。

还有其他模式可以用来支持不同的背压行为,但希望这能说明根本问题。

© www.soinside.com 2019 - 2024. All rights reserved.