Spring Webflux 返回将被异步填充的 Mono

问题描述 投票:0回答:0

我正在尝试返回对将异步填充(通过 MQ)的 http GET 调用的响应。 IE 用户调用 /hello url,该 url 异步地向某个后端系统发出请求,响应返回并且对原始 /hello 调用的响应应该是结果。

我意识到这听起来像是 Spring MVC 的一个用例,但我宁愿这样做不会阻塞线程并且在事件循环范例中而不是每个请求一个线程。

我写了以下代码:

Sinks.Many<String> sinkString = Sinks.many().multicast().directBestEffort();
Flux<String> stringsFlux = sinkString.asFlux();

@GetMapping("hello/{name}")
public Mono<String> sayHello(@PathVariable String name) {
    System.out.println("Awaiting response on thread: " + Thread.currentThread().getName());
    return stringsFlux
            .filter(e -> e.equals(name))
            .next();
}

@GetMapping("helloToYou/{name}")
public Mono<String> respondToHello(@PathVariable String name) {
    System.out.println("Trying to send response");
    sinkString.tryEmitNext(name);
    return Mono.empty();
}

应该首先执行 sayHello 方法,这相当于我正在尝试做的事情——返回一个将在未来某个时间点填充的 Mono。 (过滤器将根据相关 ID 或其他一些因素过滤请求)。

respondToHello 方法等价于我们从MQ 得到消息返回时会发生什么。它将响应写入一个流(stringsFlux),该流应该“填充”与请求过滤器匹配的单声道。

我们的想法是,我们可以释放它去做其他工作,而不是阻塞请求线程直到我们得到响应,当响应在未来的某个时候返回时,它会触发对 stringsFlux 的发布,它将把响应发布到基于过滤器的适当客户端。

在使用 1-3

localhost:8080/hello/name
调用尝试此示例之后,然后调用
localhost:8080/helloToYou/name
Mono 被正确解锁并且 http 请求返回。

在尝试

localhost:8080/hello/name
足够的调用以耗尽 reactor-http-nio 线程数 (8) 之后,系统挂起 - 看起来它在某处阻塞。我启动了我的探查器,试图看看发生了什么,但它在 netty NioEventLoop->WEPoll.wait 的内部。我看不到更多传入请求正在处理,因为我的日志行不再打印出来了。

我尝试将 hello 方法中的 flux 卸载到

.publishOn(Schedulers.boundedElastic())
,我似乎能够获得更多的 reactor-http-nio 线程,但它仍然无法解锁我。

我可以看到 http 调用是如何阻塞等待应用程序的结果的,但我认为返回一个 Mono 是合适的,虽然它现在没有值,但会在某个时候填充未来——毕竟它是一个出版商。当它具有要发布的值时,它应该解析该调用者的 http 操作。

是否可以使用反应式编程/事件循环/非阻塞来满足这个用例,还是我只是使用了完全错误的范例?

spring-webflux reactive-programming project-reactor
© www.soinside.com 2019 - 2024. All rights reserved.