我有一个方法
@Service
public class MyService {
public Mono<Integer> processData() {
... // very long reactive operation
}
}
在正常的程序流程中,我通过 Kafka 事件异步调用此方法。
出于测试目的,我需要 将该方法公开为 Web 服务,但该方法应公开为异步:仅返回 HTTP 代码
202 ACCEPTED
并继续在后台进行数据处理。
仅调用
Mono#subscribe()
并从控制器方法返回可以吗?
@RestController
@RequiredArgsConstructor
public class MyController {
private final MyService service;
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
还是这样做更好(这里我对 IntelliJ 的警告感到困惑,也许与 https://youtrack.jetbrains.com/issue/IDEA-276018 相同?):
public Mono<Void> processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
return Mono.empty();
}
或者其他解决方案?
仅调用 Mono#subscribe() 并从控制器方法返回可以吗?
有副作用,但你可能可以接受它们:
您可能会同意第一点,或者您可能希望在反应链中进一步记录一些错误,作为某种副作用,这样如果出现问题,您至少会收到内部通知。
对于第二点 - 我建议在方法调用上设置一个(慷慨的)超时,这样如果它没有在设定的时间内完成,它至少会被取消,并且不再消耗资源。如果您正在运行异步任务,那么这不是一个大问题,因为它只会消耗一点内存。如果您在弹性调度程序上包装阻塞调用,那么情况会更糟,因为您无限期地将线程绑定在该线程池中。
我还想问为什么你需要在这里使用有界弹性调度程序 - 它用于包装阻塞调用,这似乎不是这个用例的基础。 (需要明确的是,如果您的服务阻塞,那么您绝对应该将其包装在弹性调度程序上 - 但如果不是,则没有理由这样做。)
最后是这个例子:
public Mono<Void> processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
return Mono.empty();
}
...是一个很好的例子,说明了“不应该做什么”,因为您正在创建一种“冒名顶替者反应方法” - 有人可能非常合理地订阅返回的发布者,认为它会在底层发布者完成时完成,这显然不是这里发生的事情。在这种情况下,使用 void
返回类型并因此不返回任何内容是正确的做法。
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
这实际上是您在
@Scheduled
方法中所做的事情,该方法只是不返回任何内容,并且您显式订阅
Mono
或 Flux
以便发出元素。