从 SharedFlow 高效批量收集事件

问题描述 投票:0回答:2

我只有一个

SharedFlow
。收集和处理每个事件的成本很高,但是消耗和处理 100 个事件只比处理单个事件稍微贵一点,因此我需要批处理或缓冲 SharedFlow 的结果以一次处理多个事件。

SharedFlow
会间歇性地发射,但发射时的发射速率极高,比
handle
功能的处理速度还要快。由于资源有限(内存不足且处理器已经有限),我无法简单地从另一个
CoroutineScope
启动来处理单个事件。如果我单独处理每个事件,内存会填满太快。

我必须尽可能快速有效地消费事件而不造成损失。

val injectedSharedFlow: SharedFlow<MyData>

// My inefficient handling
aCoroutineScope.launch {
    
    injectedSharedFlow.collect { aSingleMyData ->
        handleSingleMyData(aSingleMyData)
    }
    
}

// What I want to do
aCoroutineScope.launch {

    // Collect up to 100 items while there are items in flow
    injectedSharedFlow.collectMany { multipleDataList ->
        handleMultipleMyData(multipleDataList)
    }

}

有没有简单的方法可以做到这一点?

android kotlin kotlin-coroutines coroutine
2个回答
0
投票

这是构建此运算符的不同方法。可能值得对它们进行基准测试以进行比较。

fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = flow {
    var list = ArrayList(size)
    collect {
        list += it
        if (list.size == size) {
            emit(list)
            list = ArrayList(size)
        }
    }
}

0
投票

回复得体。我将其封装在一个服务中,将列表提升到类级别,然后您可以拥有第二个函数,该函数在第一个作业取消时运行,以清除列表中的剩余值,因此不会丢失数据。

© www.soinside.com 2019 - 2024. All rights reserved.