flatMap 中的单声道超时 - 在“flatMap”中的 20000 毫秒内没有观察到任何项目或终端信号(并且未配置后备)

问题描述 投票:0回答:1

我正在使用 Spring WebFlux 发出 POST 请求,但收到以下错误:

java.util.concurrent.TimeoutException:
没有观察到任何项目或 ‘flatMap’中 20000ms 内的终端信号
(并且没有后备措施 配置)

这是我的代码:

public Mono<ResponseEntity<XXXResponse>> invokeXXXService(XXXRequest request) {

        return webClient
                .baseUrl(XXXUrl)
                .build()
                .post()
                .contentType(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromValue(request))
                .exchangeToMono(clientResponse -> clientResponse.toEntity(XXXResponse.class))
                .timeout(java.time.Duration.ofMillis(20000))
                .onErrorResume(throwable -> {
                    LOG.error("Errors while getting res from XXX");
                    return Mono.just(ResponseEntity.status(500).body(null));
                });
    }

看起来请求在 20 秒后超时。

  • 什么可能导致我的 WebFlux WebClient 中出现此 TimeoutException?
  • 增加超时是否有帮助,或者是否有更好的方法来处理?如果是这样,我该如何使用 Reactor 来做到这一点?

任何更优雅地处理这个问题的技巧都会很棒!

java spring spring-webflux webclient project-reactor
1个回答
0
投票

我想我明白了你的问题。

查看类似的 github 问题:

Memory leak with webclient timeout

  • 什么可能导致我的 WebFlux WebClient 中出现此 TimeoutException?

这是因为整个操作/过程花费了超过

20s
的时间 调用 API。并且
Mono.timeout
考虑到 从建立连接到每一次的时间 得到回应。因此,如果您 想要更精细的超时控制。

增加超时是否有帮助,或者是否有更好的方法来处理?如果是这样,我该如何使用 Reactor 来做到这一点?
解决方案是增加
Mono.timeout

中的超时,但是 将是一个临时修复。不是永久修复。

解决这个问题的更好方法是引入

Mono.timeout

进行回复,如果您不使用

responseTimeout
,请不要使用 不确定。

Mono.timeout
注意:

您可以通过 HttpClient client = HttpClient.create().responseTimeout(Duration.ofSeconds(20000)); WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build(); public Mono<ResponseEntity<XXXResponse>> invokeXXXService(XXXRequest request) { return webClient .baseUrl(XXXUrl) .build() .post() .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromValue(request)) .exchangeToMono(clientResponse -> clientResponse.toEntity(XXXResponse.class)) .onErrorResume(WebClientRequestException.class, ex -> { if (ex instanceof ReadTimeoutException) { // do some processing... return Mono.just("do something when timeout happens"); } else { LOG.error("Errors while getting res from XXX"); return Mono.just(ResponseEntity.status(500).body(null)); exception ..."); } }); } 粒度控制

request
级别、
connection
级别、
read
/
write
级别等超时。
这只是一个例子。根据您的要求进行相应调整。

© www.soinside.com 2019 - 2024. All rights reserved.