我有一个用例,我需要缓存通量响应并重用它
对于前。这里,
invokeEndpoint
调用外部端点来获取响应。收到回复后,我需要发送收到的原始回复和 aggregate
d 回复。
使用下面的代码,聚合方法也会进行相同的端点调用,我试图避免这种情况,因为获取响应的请求很繁重。
我尝试在
cache
上使用 share
和 data
,但它似乎不起作用,我看到正在进行冗余调用,我试图避免这些调用。
private Flux<Integer> getResponse(List<String> requests){
Flux<Integer> data = Flux.fromIterable(requests)
.flatMap(this::invokeEndpoint, 10);
// .share(); // no luck
// .cache(); // no luck
return Flux.concat(data, aggregate(data));
}
private Mono<Integer> aggregate(Flux<Integer> res){
return res.reduce((a, b) -> a + b); //redundant call
}
文档没有多大帮助,知道我们如何缓存和重用 Flux 吗?
考虑下面的代码。每个请求都会调用一次重端点。回复将在最后汇总。
@Test
void test() {
Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(this::invokeEndpoint, 10)
.flatMap(response -> sendOriginalResponse(response).thenReturn(response))
.reduce(Integer::sum)
.as(StepVerifier::create)
.expectNext(55)
.verifyComplete();
}
private Mono<Integer> sendOriginalResponse(Integer response) {
return Mono.just(1)
.doOnNext(i -> System.out.println("sending original response " + response));
}
private Mono<Integer> invokeEndpoint(Integer i) {
return Mono.just(i * i).delayElement(Duration.ofSeconds(5))
.doOnNext(x -> System.out.println("invoke heavy endpoint for " + i));
}