在我的android项目中,我只想在订阅流后接收一次新数据,然后停止订阅。我对热流(SharedFlow)有所了解。
我的目标:
我的简化代码:
class Producer {
private val _event = MutableSharedFlow<String>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val event: Flow<String> = _event.asSharedFlow()
fun produce(event: String) {
_event.tryEmit(event)
}
}
class Consumer {
val producer = Producer()
var job: Job? = null
fun consume(viewModelScope: CoroutineScope) {
job = producer
.event
.onEach {
job?.cancel()
// process
}
.launchIn(viewModelScope)
}
}
它可以工作,但在我看来,某些功能可以被其他功能取代,例如:
onEach
Job
的引用并调用 launchIn
,而是尝试调用 .shareIn(viewModelScope, WhileSubscribed())
.onEach {
// job?.cancel() <- removed it
// process
}
.shareIn(viewModelScope, WhileSubscribed())
但它不起作用..
如何改进我的代码?
你试过
.take(2)
吗? 这将返回一个仅返回父流的前 2 个元素的流。 因此,如果 foo 是一个 StateFlow,那么 foo.take(2)
将返回一个仅发出初始值和下一个更新的流。 foo.take(2).onEach{}
将允许您在这 2 个元素中的每一个上运行 lambda。