Kotlin 代码
runBlocking {
flow {
for (i in 0..4) {
println("Emit $i")
emit(i)
}} .onEach { if (it%2 == 0) delay(200) // Block 1
println("A: got $it")
}
.onEach { println("B: got $it") } // Block 2
.collect()
}
在控制台打印:
Emit 0
A: got 0
B: got 0
Emit 1
A: got 1
B: got 1
Emit 2
...
如何运行并行处理 block1 和 block2,以在一半情况下从块 2 获取块 1 之前的消息?
您可以尝试在这些块中启动单独的协程:
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
flow {
for (i in 0..4) {
println("Emit $i")
emit(i)
}
}.onEach { // Block 1
scope.launch {
if (it % 2 == 0) delay(200)
println("A: got $it")
}
}.onEach { // Block 2
scope.launch {
println("B: got $it")
}
}.launchIn(scope)
在这种情况下,
Block 2
将在Block 1
之前打印。 SupervisorJob
需要在此处防止在其中一个协程失败时取消启动的协程。
此解决方案不保证顺序,例如下一个顺序可以有日志:
Emit 0
Emit 1
Emit 2
Emit 3
Emit 4
B: got 0
A: got 1
B: got 1
B: got 2
A: got 3
B: got 3
B: got 4
A: got 0
A: got 2
A: got 4
如果我们需要处理结果,我们也可以使用
callbackFlow
:
scope.launch {
parallelIntFlow().collect {
println("$it")
}
}
fun intFlow() = flow {
for (i in 0..4) {
println("Emit $i")
emit(i)
}
}
fun parallelIntFlow() = callbackFlow {
// parallel processing of ints
val job = intFlow()
.onEach {
launch {
if (it % 2 == 0) delay(200)
send("A: got $it")
}
}
.onEach {
launch {
send("B: got $it")
}
}
.launchIn(this)
awaitClose {
job.cancel()
}
}