我想分享(即分割)我的通量,但
share()
似乎不会导致我的订阅被共享。为什么?
我有一个由昂贵的数据库调用发出的
Flux
。我想分割该通量并以不同的方式处理它产生的值(但not使用groupBy()
运算符)。后来,我想再次组合不同的路径,这样我只需订阅一次(例如通过 REST 控制器):
static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
return Flux.generate(
() -> {
System.out.println("subscribed"); // should only happen once
return 0;
},
(state, sink) -> {
sink.next(state);
return state + 1;
})
.map(String::valueOf)
.log()
.delayElements(Duration.ofSeconds(1))
.take(2)
.share() // share the flux so the DB is only queried once
;
}
static Flux<String> pathA() {
// complicated calculations
return expensiveDatabaseCall().doOnNext(it -> System.out.println("a: " + it));
}
static Flux<String> pathB() {
// different, equally complicated calculations
return expensiveDatabaseCall().doOnNext(it -> System.out.println("b: " + it));
}
static Flux<String> controller() { // pretend this happens in a REST controller
return Flux.merge(pathA(), pathB());
}
@Test
void test() {
StepVerifier.create(controller()).expectNextCount(4).verifyComplete();
}
由于我使用的是
share()
运算符,我本以为只会看到一个订阅 - 但实际上我看到了两个 "subscribed"
。为什么?
share()
操作符不是应该订阅上游并自行处理所有下游订阅,而不是将它们传递回上游源吗?
这至少是我对docs的理解;他们是这么说的:
返回一个新的 Flux,它多播(共享)原始 Flux。 [...]
解释非常简单:任何缓存/共享行为都绑定到通量实例。这意味着当你这样做时:
public Flux<Integer> sharedCountdown() {
return Flux.just(3, 2, 1, 0).share();
}
var instanceA = sharedCountdown();
var instanceB = sharedCountdown();
您创建两个不同的通量,每个通量都有自己的订阅和缓存。这与任何其他普通 Java 对象完全相同。
如果要共享,则必须在下游处理中使用相同的实例。
就你而言,你必须颠倒逻辑。 您的后处理器不应该自己调用/创建昂贵的通量,它们应该接收组装的实例作为输入。另外,如果您想确保所有下游处理器接收所有上游信号,您应该避免
share()
,而使用 publish().autoconnect(numberOfPostProcessors):
static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
return Flux.generate(
() -> {
System.out.println("subscribed"); // should only happen once
return 0;
},
(state, sink) -> {
sink.next(state);
return state + 1;
})
.map(String::valueOf)
.log()
.delayElements(Duration.ofSeconds(1))
.take(2)
;
}
static Flux<String> pathA(Flux<String> upstreamFlux) {
// complicated calculations
return upstreamFlux.doOnNext(it -> System.out.println("a: " + it));
}
static Flux<String> pathB(Flux<String> upstreamFlux) {
// different, equally complicated calculations
return upstreamFlux.doOnNext(it -> System.out.println("b: " + it));
}
static Flux<String> controller() { // pretend this happens in a REST controller
var sharedExpensiveUpstream = expensiveDatabaseCall().publish().autoconnect(2);
return Flux.merge(pathA(sharedExpensiveUpstream), pathB(sharedExpensiveUpstream));
}