我有几个生产者,每个生产者都会向自己的流发出数据,然后我将它们合并为一个,进一步处理这些值并消耗它们:
fun makeInputFlow() = flow {
while (shouldMakeRequest()) {
// make (possibly blocking) network request
// do some CPU-intensive operations
results.forEach { emit(it) }
yield()
}
}
val input1 = makeInputFlow()
val input2 = makeInputFlow()
val input3 = makeInputFlow()
val merged = listOf(flow1, flow2, flow3)
.merge()
// process ...
.collect { println(it) }
这是可行的,但似乎当时只有一个流程实际产生值。 CPU大部分时间都处于空闲状态。
有没有一种方法可以在自己的协程/线程中安全地运行每个流程,以便所有流程并行产生值?
我们通常不允许在协程内执行阻塞 I/O,因为这可能会导致许多不同类型的问题,类似于您观察到的问题。
为了执行阻塞 I/O,我们需要临时切换到专门的线程池来处理此类工作负载:
while (shouldMakeRequest()) {
val results = withContext(Dispatchers.IO) {
// make (possibly blocking) network request
// do some CPU-intensive operations
}
results.forEach { emit(it) }
yield()
}
请注意,
Dispatchers.IO
对于 CPU 密集型操作来说并不是最佳选择。为此,最好使用Dispatchers.Default
。理想情况下,您应该使生产者逻辑成为可挂起的代码,根据工作负载在调度程序之间切换。如果您做不到这一点,Dispatchers.IO
可能是您最好的选择。