像操作者一样模仿 "reduceWhile"。

问题描述 投票:0回答:1

我有一个值的流量,我需要 "合并",直到它们满足某个条件。我不想使用外部变量作为缓冲区来检查我的条件是否得到满足,但我找不到一种方法来进行 "reduceWhile "或 "bufferWhile "操作,因为我没有访问缓冲区的权限(用 .buffer)来检查我是否有足够的数据将结果传递给下游。

我有。

                ohlcIntfFlux
                        .reduce(OHLCIntf::mergeWith)
                        .filter(timeFrameProvider::isBarComplete)
                        .map(this::makeBar)
                        .subscribe(pub::next)

但这是先减少整个数据流,然后检查过滤器。

我需要减少数据,直到满足过滤器的条件,然后将其传递到下游。

希望它足够清晰...

谢谢你!我有一个值的流量,它是由过滤器中的值组成的。

spring spring-webflux project-reactor
1个回答
0
投票

我是这样做的。

ConnectableFlux.create(pub ->
                ohlcIntfFlux
                        .scan((i, acc) -> {
                            if (!timeFrameProvider.isBarComplete(acc))
                                return acc.mergeWith(i);

                            pub.next(makeBar(acc));
                            return i;
                        })
                        .subscribe()
        )

它做了我想做的事,但如果你能提出更实用的方法,我会很高兴听到。

© www.soinside.com 2019 - 2024. All rights reserved.