我正在玩 Project Reactor 并面临不直观的行为。
举以下例子:
Mono.fromCallable(() -> calculate())
//.log()
.publishOn(Schedulers.boundedElastic())
.doOnNext(i -> System.out.println("thread: " + Thread.currentThread().getName()))
.block();
当我取消注释
log()
时,calculate()
在 main
线程上运行,但是当我将其留在其中时,该方法将在 boundedElastic
上执行。为什么 log()
会改变 Mono 的行为?
如果您查看
publishOn
方法的实现,您会注意到它有一个特定的情况,即在 Callable
之后立即调用它。在这种情况下,它返回 MonoSubscribeOnCallable
并且可调用函数在传递给 publishOn
的调度程序上执行。这就是您看到这种行为的原因。
但是,如果您在
.log
和 fromCallable
之间添加 publishOn
运算符,则不适用这种特殊情况。这是因为 this
指针现在指向 MonoLogFuseable
,因此,MonoPublishOn
以默认行为返回。
public final Mono<T> publishOn(Scheduler scheduler) {
if(this instanceof Callable) {
if (this instanceof Fuseable.ScalarCallable) {
try {
T value = block();
return onAssembly(new MonoSubscribeOnValue<>(value, scheduler));
}
catch (Throwable t) {
//leave MonoSubscribeOnCallable defer error
}
}
@SuppressWarnings("unchecked")
Callable<T> c = (Callable<T>)this;
return onAssembly(new MonoSubscribeOnCallable<>(c, scheduler));
}
return onAssembly(new MonoPublishOn<>(this, scheduler));
}
这是引入此更改的提交:https://github.com/reactor/reactor-core/commit/41d9dae7256ffc36a07db7a9f5fa4d95182a5ad9
不幸的是,我找不到任何票证参考或解释来清楚地理解为什么它会这样工作。