Spring WebFlux 缓存结果 Flux 并复用它

问题描述 投票:0回答:1

我有一个用例,我需要缓存通量响应并重用它

对于前。这里,

invokeEndpoint
调用外部端点来获取响应。收到回复后,我需要发送收到的原始回复和
aggregate
d 回复。

使用下面的代码,聚合方法也会进行相同的端点调用,我试图避免这种情况,因为获取响应的请求很繁重。

我尝试在

cache
上使用
share
data
,但它似乎不起作用,我看到正在进行冗余调用,我试图避免这些调用。

private Flux<Integer> getResponse(List<String> requests){
   Flux<Integer> data = Flux.fromIterable(requests)
      .flatMap(this::invokeEndpoint, 10);
   //   .share(); // no luck
   //   .cache(); // no luck
   return Flux.concat(data, aggregate(data));
}

private Mono<Integer> aggregate(Flux<Integer> res){
   return res.reduce((a, b) -> a + b); //redundant call
}

文档没有多大帮助,知道我们如何缓存和重用 Flux 吗?

java spring-webflux flux
1个回答
0
投票

考虑下面的代码。每个请求都会调用一次重端点。回复将在最后汇总。

@Test
void test() {
    Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
        .flatMap(this::invokeEndpoint, 10)
        .flatMap(response -> sendOriginalResponse(response).thenReturn(response))
        .reduce(Integer::sum)
        .as(StepVerifier::create)
        .expectNext(55)
        .verifyComplete();
}

private Mono<Integer> sendOriginalResponse(Integer response) {
    return Mono.just(1)
        .doOnNext(i -> System.out.println("sending original response " + response));
}

private Mono<Integer> invokeEndpoint(Integer i) {
    return Mono.just(i * i).delayElement(Duration.ofSeconds(5))
        .doOnNext(x -> System.out.println("invoke heavy endpoint for " + i));
}
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.