来自多个协程/线程的流生产者/构建器

问题描述 投票:0回答:1

我有几个生产者,每个生产者都会向自己的流发出数据,然后我将它们合并为一个,进一步处理这些值并消耗它们:

fun makeInputFlow() = flow { 
  while (shouldMakeRequest()) {
    // make (possibly blocking) network request
    // do some CPU-intensive operations
    results.forEach { emit(it) }
    yield()
  }
}

val input1 = makeInputFlow()
val input2 = makeInputFlow()
val input3 = makeInputFlow()

val merged = listOf(flow1, flow2, flow3)
  .merge()
  // process ...
  .collect { println(it) }

这是可行的,但似乎当时只有一个流程实际产生值。 CPU大部分时间都处于空闲状态。

有没有一种方法可以在自己的协程/线程中安全地运行每个流程,以便所有流程并行产生值?

multithreading kotlin concurrency kotlin-coroutines kotlin-flow
1个回答
0
投票

我们通常不允许在协程内执行阻塞 I/O,因为这可能会导致许多不同类型的问题,类似于您观察到的问题。

为了执行阻塞 I/O,我们需要临时切换到专门的线程池来处理此类工作负载:

  while (shouldMakeRequest()) {
    val results = withContext(Dispatchers.IO) {
    // make (possibly blocking) network request
    // do some CPU-intensive operations
    }
    results.forEach { emit(it) }
    yield()
  }

请注意,

Dispatchers.IO
对于 CPU 密集型操作来说并不是最佳选择。为此,最好使用
Dispatchers.Default
。理想情况下,您应该使生产者逻辑成为可挂起的代码,根据工作负载在调度程序之间切换。如果您做不到这一点,
Dispatchers.IO
可能是您最好的选择。

© www.soinside.com 2019 - 2024. All rights reserved.