我有两个测试使用 sleep() 模拟需要时间处理的 api 调用,并测试 Mono.just() 是否使其成为非阻塞。
在我的第一个测试中,我直接发出了一个字符串,但在 map() 中使其阻塞。
@Test
public void testMono() throws InterruptedException {
for (int i = 0; i < 10; i++) {
Mono.just("Hello World")
.map(s -> {
System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
})
.map(integer -> {
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer.toString();
})
.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println);
System.out.println("Loop: " + i + " " + "after MONO");
}
sleep(10000);
}
结果预计是非阻塞的,因为所有输出
System.out.println("Loop: " + i + " " + "after MONO");
同时出现。
但是,在第二个测试中,我将发出的元素从“Hello world”替换为阻塞的 getString() 方法。将 sleep() 放在 getString() 中的目的是,我想模拟需要时间来获取发出的元素的场景。 结果现在是阻塞的,因为输出在完成接收发射的元素后一个一个地显示出来。
@Test
public void testMono2() throws InterruptedException {
for (int i = 0; i < 10; i++) {
Mono.just(getString())
.map(s -> {
System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
})
.map(integer -> {
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer.toString();
})
.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println);
System.out.println("Loop: " + i + " " + "after MONO");
}
sleep(10000);
}
public String getString(){
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello world";
}
在第三个测试中,它也是非阻塞的。
int count = 0;
@Test
public void testFluxSink() throws InterruptedException {
for (int i = 0; i < 10; i++) {
System.out.println("Start Loop: " + i);
int curLoop = count++;
Flux.create(fluxSink -> {
try {
System.out.println("Loop: " + curLoop + " " + " start sleeping");
sleep(5000);
System.out.println("Loop: " + curLoop + " " + " end sleeping");
fluxSink.next(onNext());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).subscribeOn(Schedulers.parallel()).subscribe(s -> {
System.out.println(Thread.currentThread());
System.out.println("Loop: " + curLoop + " " + "completed receiving" + " " + s);
});
}
sleep(100000);
}
public String onNext() throws InterruptedException {
sleep(5000);
return "onNext";
}
想知道我是不是对reactor的概念理解有误,想知道第二次测试怎么用错了?
首先,了解 Reactor 不会执行任何从阻塞调用到非阻塞调用的神奇转换是至关重要的。要制作一个端到端的非阻塞应用程序,您需要使用非阻塞驱动程序、NIO 等。如果您将阻塞代码包装到
Mono
或 Flux
,线程仍然会被阻塞。
至于你的第二个例子,在
getString
方法中睡眠。有两点需要注意:
第一个:你用
Mono.just
。重要的是它通常用于立即提供已知值。这意味着 getString
计算不是发生在反应器链的范围内,而是发生在反应器链组装阶段的主线程中。这就是您看到“顺序”行为的原因。如果将其替换为 Mono.fromCallable
,则计算将在反应器链(并行调度程序线程)中进行,您将看到与 1 或 3 示例中相同的行为。
第二个:重要的是要注意,将您的
getString
方法包装到Mono.fromCallable
不会使您的代码成为非阻塞的。您的 sleep
仍然会停止并行调度程序的线程。在您的生产代码中,您的 getString
方法可能是一些数据库调用或其他服务调用,它们应该通过非阻塞驱动程序或基于 NIO 的库(如 Netty)完成。要模拟它,请使用 delayElement
而不是 sleep
,它以非阻塞方式工作。这是一个例子:
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
getString()
.map(s -> {
System.out.println(Instant.now() + " Thread:" + Thread.currentThread().getName() + " " + s);
return s.length();
})
.subscribeOn(Schedulers.parallel())
.subscribe();
System.out.println(Instant.now() + " Loop: " + i + " " + "after MONO");
}
TimeUnit.SECONDS.sleep(10);
}
public static Mono<String> getString() {
return Mono.delay(Duration.ofSeconds(3))
.map(it -> "hello world");
}
它打印
2023-05-17T20:31:12.099464Z Loop: 0 after MONO
2023-05-17T20:31:12.112953Z Loop: 1 after MONO
2023-05-17T20:31:12.113145Z Loop: 2 after MONO
2023-05-17T20:31:15.105750Z Thread:parallel-2 hello world
2023-05-17T20:31:15.117944Z Thread:parallel-5 hello world
2023-05-17T20:31:15.117944Z Thread:parallel-6 hello world