我有一个
Flow<List<Int?>>
,我想收集这个流,但前提是我得到一个 null Int。然后流程应该被取消。例如,
val flow = flowOf(
listOf(1, 2),
listOf(3, null),
listOf(4)
)
flow.collectUntilNull {
println(it)
}
我想要的输出是:
[1, 2]
[3, null]
我知道有一个函数
Flow.takeWhile
,但它不会发出谓词返回 false 的值。就我而言,我也想要那个。
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
return@flow collectWhile { value ->
if (predicate(value)) {
emit(value)
true
} else {
// I want a "emit(value)" here
false
}
}
}
由于
collectWhile
是内部的,我无法使用此代码。虽然我想我可以在我的代码中复制粘贴该 collectWhile
实现。但是还有其他方法可以做到这一点吗?
transformWhile
是更灵活/通用的 takeWhile
。
flow
.transformWhile {
emit(it)
myCondition(it)
}
.collect {
println(it)
}
所以,我发现的一种方法看起来像:
var b = true
flow.takeWhile {i -> b.also { b = myCondition(i) } }
.collect {
println(it)
}
把它拿出来作为扩展,
fun <T> Flow<T>.takeUntil(predicate: suspend (T) -> Boolean): Flow<T> {
var b = true
return takeWhile { i ->
b.also { b = predicate(i) }
}
}
// Usage
flow.takeUntil { myCondition(it) }
.collect {
}
有更好的办法吗?
我是这样解决的:
fun <R> Flow<R>.takeUntil(
transform: suspend FlowCollector<R>.(value: R) -> Boolean): Flow<R> {
return transformWhile {
emit(it)
!transform(it)
}
}
和
suspend fun Flow<Int>.onProcessTest(
onSuccess: ((Int) -> Unit)
) {
takeUntil {
it == 5
}.collect { value ->
onSuccess(value)
}
}
fun main(): Unit = runBlocking {
val flows = flow {
emit(1)
emit(2)
emit(3)
emit(4)
emit(5)
emit(6)
emit(7)
}
flows.onProcessTest {
println(it)
}
}
结果:
1
2
3
4
5
Process finished with exit code 0