我只有一个
SharedFlow
。收集和处理每个事件的成本很高,但是消耗和处理 100 个事件只比处理单个事件稍微贵一点,因此我需要批处理或缓冲 SharedFlow 的结果以一次处理多个事件。
SharedFlow
会间歇性地发射,但发射时的发射速率极高,比 handle
功能的处理速度还要快。由于资源有限(内存不足且处理器已经有限),我无法简单地从另一个 CoroutineScope
启动来处理单个事件。如果我单独处理每个事件,内存会填满太快。
我必须尽可能快速有效地消费事件而不造成损失。
val injectedSharedFlow: SharedFlow<MyData>
// My inefficient handling
aCoroutineScope.launch {
injectedSharedFlow.collect { aSingleMyData ->
handleSingleMyData(aSingleMyData)
}
}
// What I want to do
aCoroutineScope.launch {
// Collect up to 100 items while there are items in flow
injectedSharedFlow.collectMany { multipleDataList ->
handleMultipleMyData(multipleDataList)
}
}
有没有简单的方法可以做到这一点?
这是构建此运算符的不同方法。可能值得对它们进行基准测试以进行比较。
fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = flow {
var list = ArrayList(size)
collect {
list += it
if (list.size == size) {
emit(list)
list = ArrayList(size)
}
}
}
回复得体。我将其封装在一个服务中,将列表提升到类级别,然后您可以拥有第二个函数,该函数在第一个作业取消时运行,以清除列表中的剩余值,因此不会丢失数据。