我在应用程序中遇到测试偶尔会失败的问题。我已经设法将问题范围缩小到这段代码片段。该代码片段包含两个 AsyncStream,它们都观察并修改相同的
@Published
属性 fire
。
我的期望是 AsyncStreams 将接收来自
@Published
属性的所有更新,确保测试始终成功。然而,经过一定次数的测试运行后(大约 < 100000), the test hangs and never completes.
起初,我怀疑存在数据争用问题,但即使使用
@MainActor
注释该类以确保整个类在同一线程上运行,问题仍然存在。
成功测试后,控制台会打印:
createStream1(): true
createStream2(): true
createStream1(): false
如果测试不成功,控制台会打印:
createStream1(): true
createStream2(): true
createStream1(): true
代码片段:
final class Tests: XCTestCase {
@MainActor func test_SUTStream1() async {
await SUT().toTest()
}
}
@MainActor
class SUT {
@Published private(set) var fire = false
init() {}
func toTest() async {
let cancelable = Task {
await createStream2()
}
await createStream1()
cancelable.cancel()
}
private func createStream1() async {
fire = true
for await didFire in $fire.values {
print("\(#function): \(didFire)")
if didFire == false {
break
}
}
}
private func createStream2() async {
for await didFire in $fire.values {
print("\(#function): \(didFire)")
if didFire == true {
fire = false
break
}
}
}
}
@Published
属性的更新?问题在于,
values
在存在背压的情况下不会缓冲其值。因此,如果在现有值被消耗之前发布后续值,则旧值将被删除。
考虑以下内容,其中我添加了延迟以一致地体现该行为:
@MainActor
class Foo {
@Published private(set) var value = 0
init() {}
func experiment() async throws {
let publishTask = Task {
for i in 0 ..< 10 {
value = i
try await Task.sleep(for: .milliseconds(300))
}
}
let consumeTask = Task {
for await value in $value.values {
print(value)
if value == 9 { break }
try await Task.sleep(for: .seconds(2))
}
}
try await withTaskCancellationHandler {
try await publishTask.value
try await consumeTask.value
} onCancel: {
publishTask.cancel()
consumeTask.cancel()
}
print("done")
}
}
调用
experiment
,会产生:
0
6
9
如果您不希望它删除值,您可以向
consumeTask
添加缓冲区:
let consumeTask = Task {
for await value in $value.buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest).values {
print(value)
if value == 9 { break }
try await Task.sleep(for: .seconds(2))
}
}
这将捕获所有值。