如何从提供 Mono 的函数正确生成 Flux?

问题描述 投票:0回答:1

我想写一段代码来响应式地遍历数据库(下一个查询取决于上一个查询的结果)。

我的问题可以简化如下: 假设我们有这样的功能

// some long running call
private Mono<Integer> calculateNext(Integer value) {
    return Mono.defer(() -> Mono.just(value + 1))
        .delayElement(Duration.ofSeconds(1L));
}

如何从中创建一个包含重复调用函数结果的 Flux?

以下作品

Flux.generate(
    () -> Mono.just(0),
    (previousResult, sink) -> {
        var nextResult = calculateNext(previousResult.block());
        sink.next(nextResult);
        return nextResult;
    }
).flatMap(it -> (Mono<Integer>) it)
.takeUntil(it -> it > 1000)
.doOnNext(it -> LOG.info("it = " + it))
.subscribeOn(Schedulers.boundedElastic())
.blockLast();

并产生从 1 到 1000 递增的整数。

但似乎使用

.block()
违背了使用 Fluxes 和 Monos 的目的,并可能导致性能问题。

这个

(previousResult, sink) -> {
    var nextResult = previousResult.flatMap(it -> calculateNext(it));
    sink.next(nextResult);
    return nextResult;
}

也有效,但看起来有点可怕,因为它似乎相当于链接一千个

.flatMap
调用。

java project-reactor
1个回答
0
投票

您可以使用

Mono.expand
递归地获取结果,并使用上一个查询的结果来检索下一个结果。

private Flux<Integer> fetch(int start, int end) {
    return calculateNext(start)
            .expand(res -> calculateNext(res))
            .takeUntil(res -> res == end);
}

@Test
void test() {
    StepVerifier.create(fetch(0, 5))
            .expectNext(1, 2, 3, 4, 5)
            .verifyComplete();
}
© www.soinside.com 2019 - 2024. All rights reserved.