我有一个项目列表,我需要从不同来源检索数据、合并并返回这些项目。这些项目是完全独立的,获取数据将花费最多的时间,所以我希望它并行完成。我以为我可以使用 Flux.fromStream() 和 Mono.zipWith() 在 Reactor 中做到这一点,但它似乎在等待每个项目完成后再开始下一个项目。
这是我写的一个测试来说明这个问题。该代码制作了 10 个三明治,并模拟了获取花生酱和果冻的随机延迟:
class FluxOrderingTest {
private final Random random = new Random();
@Test
void testAsyncSandwich() {
Flux<String> sandwiches = Flux.fromStream(IntStream.rangeClosed(1, 10).boxed())
.flatMap(number -> getJelly(number).zipWith(getPeanutButter(number)))
.map(this::makeSandwich);
StepVerifier.create(sandwiches)
.expectNextCount(10)
.verifyComplete();
}
private String makeSandwich(Tuple2<String, String> ingredients) {
System.out.printf("Combined %s with %s%n", ingredients.getT1(), ingredients.getT2());
return "Sandwich";
}
private Mono<String> getPeanutButter(Integer number) {
return Mono.fromSupplier(() -> withDelay("the peanut butter for " + number));
}
private Mono<String> getJelly(Integer number) {
return Mono.fromSupplier(() -> withDelay("the jelly for " + number));
}
private String withDelay(String s) {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Got " + s);
return s;
}
}
在输出中,我预计三明治会乱序完成,因为获取配料所需的时间是随机的。但是,Reactor 会按顺序完成每个三明治。
Got the jelly for 1
Got the peanut butter for 1
Combined the jelly for 1 with the peanut butter for 1
Got the jelly for 2
Got the peanut butter for 2
Combined the jelly for 2 with the peanut butter for 2
Got the jelly for 3
Got the peanut butter for 3
Combined the jelly for 3 with the peanut butter for 3
Got the jelly for 4
Got the peanut butter for 4
Combined the jelly for 4 with the peanut butter for 4
Got the jelly for 5
Got the peanut butter for 5
Combined the jelly for 5 with the peanut butter for 5
Got the jelly for 6
Got the peanut butter for 6
Combined the jelly for 6 with the peanut butter for 6
Got the jelly for 7
Got the peanut butter for 7
Combined the jelly for 7 with the peanut butter for 7
Got the jelly for 8
Got the peanut butter for 8
Combined the jelly for 8 with the peanut butter for 8
Got the jelly for 9
Got the peanut butter for 9
Combined the jelly for 9 with the peanut butter for 9
Got the jelly for 10
Got the peanut butter for 10
Combined the jelly for 10 with the peanut butter for 10
既然我不关心最后的订单,我该如何安排同时取走所有的原料?
您的代码是合成的,具有实际 IO 的真实代码的行为会有所不同,因为它会为 IO 使用不同的Schedulers。当然,这取决于您如何进行 IO。如果你使用反应式 IO,你应该没有任何问题。
在您的情况下,它只是一个调度程序,可以立即运行所有内容,即在一个线程中。
尝试订阅不同的调度程序:
.flatMap(number ->
getJelly(number)
.subscribeOn(Schedulers.boundedElastic())
.zipWith(
getPeanutButter(number)
.subscribeOn(Schedulers.boundedElastic())
)
)