协程 - 仅接收新数据,然后停止订阅流

问题描述 投票:0回答:1

在我的android项目中,我只想在订阅流后接收一次新数据,然后停止订阅。我对热流(SharedFlow)有所了解。

我的目标:

  • 尝试发出数据(不暂停)
  • 订阅制作人
  • 接收新数据,而非历史数据,仅一次
  • 停止订阅

我的简化代码:

    class Producer {
        private val _event = MutableSharedFlow<String>(
            extraBufferCapacity = 1,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        )
        val event: Flow<String> = _event.asSharedFlow()
    
        fun produce(event: String) {
            _event.tryEmit(event)
        }
    }
    
    class Consumer {
        val producer = Producer()
        var job: Job? = null
    
        fun consume(viewModelScope: CoroutineScope) {
            job = producer
                .event
                .onEach {
                    job?.cancel()
                    // process
                }
                .launchIn(viewModelScope)
        }
    }


它可以工作,但在我看来,某些功能可以被其他功能取代,例如:

  • 如果我只想接收一次数据,那么它应该是另一个函数而不是
    onEach
  • 我没有存储对
    Job
    的引用并调用
    launchIn
    ,而是尝试调用
    .shareIn(viewModelScope, WhileSubscribed())
    .onEach {
        // job?.cancel() <- removed it
        // process
    }
    .shareIn(viewModelScope, WhileSubscribed())

但它不起作用..

如何改进我的代码?

android kotlin kotlin-coroutines coroutine
1个回答
0
投票

你试过

.take(2)
吗? 这将返回一个仅返回父流的前 2 个元素的流。 因此,如果 foo 是一个 StateFlow,那么
foo.take(2)
将返回一个仅发出初始值和下一个更新的流。
foo.take(2).onEach{}
将允许您在这 2 个元素中的每一个上运行 lambda。

© www.soinside.com 2019 - 2024. All rights reserved.