Kotlin 中流发出的并行处理值

问题描述 投票:0回答:1

Kotlin 代码

runBlocking {
    flow {
        for (i in 0..4) {
            println("Emit $i")
            emit(i)
        }}  .onEach { if (it%2 == 0) delay(200) // Block 1
                println("A: got $it")
            }
            .onEach { println("B: got $it") } // Block 2
            .collect()
}

在控制台打印:

Emit 0
A: got 0
B: got 0
Emit 1
A: got 1
B: got 1
Emit 2
...

如何运行并行处理 block1 和 block2,以在一半情况下从块 2 获取块 1 之前的消息?

kotlin kotlin-coroutines coroutine kotlin-flow coroutinescope
1个回答
1
投票

您可以尝试在这些块中启动单独的协程:

private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

flow {
    for (i in 0..4) {
        println("Emit $i")
        emit(i)
    }
}.onEach { // Block 1
    scope.launch {
        if (it % 2 == 0) delay(200) 
        println("A: got $it")
    }

}.onEach { // Block 2
    scope.launch { 
        println("B: got $it") 
    }
}.launchIn(scope)

在这种情况下,

Block 2
将在
Block 1
之前打印。
SupervisorJob
需要在此处防止在其中一个协程失败时取消启动的协程。

此解决方案不保证顺序,例如下一个顺序可以有日志:

Emit 0
Emit 1
Emit 2
Emit 3
Emit 4

B: got 0
A: got 1
B: got 1
B: got 2
A: got 3
B: got 3
B: got 4
A: got 0
A: got 2
A: got 4

如果我们需要处理结果,我们也可以使用

callbackFlow

scope.launch {
    parallelIntFlow().collect {
        println("$it")
    }
} 

fun intFlow() = flow {
    for (i in 0..4) {
        println("Emit $i")
        emit(i)
    }
}

fun parallelIntFlow() = callbackFlow {
    // parallel processing of ints
    val job = intFlow()
        .onEach {
            launch {
                if (it % 2 == 0) delay(200)
                send("A: got $it")
            }
        }
        .onEach {
            launch {
                send("B: got $it")
            }
        }
        .launchIn(this)
    awaitClose {
        job.cancel()
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.