我想写一段代码来响应式地遍历数据库(下一个查询取决于上一个查询的结果)。
我的问题可以简化如下: 假设我们有这样的功能
// some long running call
private Mono<Integer> calculateNext(Integer value) {
return Mono.defer(() -> Mono.just(value + 1))
.delayElement(Duration.ofSeconds(1L));
}
如何从中创建一个包含重复调用函数结果的 Flux?
以下作品
Flux.generate(
() -> Mono.just(0),
(previousResult, sink) -> {
var nextResult = calculateNext(previousResult.block());
sink.next(nextResult);
return nextResult;
}
).flatMap(it -> (Mono<Integer>) it)
.takeUntil(it -> it > 1000)
.doOnNext(it -> LOG.info("it = " + it))
.subscribeOn(Schedulers.boundedElastic())
.blockLast();
并产生从 1 到 1000 递增的整数。
但似乎使用
.block()
违背了使用 Fluxes 和 Monos 的目的,并可能导致性能问题。
这个
(previousResult, sink) -> {
var nextResult = previousResult.flatMap(it -> calculateNext(it));
sink.next(nextResult);
return nextResult;
}
也有效,但看起来有点可怕,因为它似乎相当于链接一千个
.flatMap
调用。
您可以使用
Mono.expand
递归地获取结果,并使用上一个查询的结果来检索下一个结果。
private Flux<Integer> fetch(int start, int end) {
return calculateNext(start)
.expand(res -> calculateNext(res))
.takeUntil(res -> res == end);
}
@Test
void test() {
StepVerifier.create(fetch(0, 5))
.expectNext(1, 2, 3, 4, 5)
.verifyComplete();
}