我有一种变化,所有的价值观同时出现。我想对这些值的块并行执行一项时间密集型任务。为什么这段代码在同一线程上执行每个映射而不是并行执行?
@Test
public void test() throws InterruptedException {
Flux.just("a", "b", "c", "d", "e", "f", "g")
.buffer(2)
.publishOn(Schedulers.parallel())
.map(this::doTimeintensiveStuff)
.doOnNext(val -> {
log.info("[" + Thread.currentThread().getName() + "] " + val);
})
.blockLast();
}
private String doTimeintensiveStuff(List<String> input) {
return String.join(", ", input); // this is just a place holder
}
我的日志输出如下所示
Aug. 07, 2023 4:44:50 PM de.test.ParallelTest lambda$1
INFORMATION: [parallel-1] a, b
Aug. 07, 2023 4:44:50 PM de.test.ParallelTest lambda$1
INFORMATION: [parallel-1] c, d
Aug. 07, 2023 4:44:50 PM de.test.ParallelTest lambda$1
INFORMATION: [parallel-1] e, f
Aug. 07, 2023 4:44:50 PM de.stest.ParallelTest lambda$1
INFORMATION: [parallel-1] g
应该是:
public void test() throws InterruptedException {
Flux.just("a", "b", "c", "d", "e", "f", "g")
.buffer(2)
.parallel()
.runOn(Schedulers.parallel())
.map(this::doTimeintensiveStuff)
.sequential()
.doOnNext(val -> {
log.info("[" + Thread.currentThread().getName() + "] " + val);
})
.blockLast();
}
因为
.publishOn(..)
不会派生新线程,它只是定义当前线程。但是.parallel().runOn(..)
使用并行线程来执行。