我有一个值的流量,我需要 "合并",直到它们满足某个条件。我不想使用外部变量作为缓冲区来检查我的条件是否得到满足,但我找不到一种方法来进行 "reduceWhile "或 "bufferWhile "操作,因为我没有访问缓冲区的权限(用 .buffer
)来检查我是否有足够的数据将结果传递给下游。
我有。
ohlcIntfFlux
.reduce(OHLCIntf::mergeWith)
.filter(timeFrameProvider::isBarComplete)
.map(this::makeBar)
.subscribe(pub::next)
但这是先减少整个数据流,然后检查过滤器。
我需要减少数据,直到满足过滤器的条件,然后将其传递到下游。
希望它足够清晰...
谢谢你!我有一个值的流量,它是由过滤器中的值组成的。
我是这样做的。
ConnectableFlux.create(pub ->
ohlcIntfFlux
.scan((i, acc) -> {
if (!timeFrameProvider.isBarComplete(acc))
return acc.mergeWith(i);
pub.next(makeBar(acc));
return i;
})
.subscribe()
)
它做了我想做的事,但如果你能提出更实用的方法,我会很高兴听到。