我正在执行一个返回 Mono 的 WebClient get 调用,该调用成功了。但我试图通过在 Mono 上执行 block() 来获取字符串。结果,我得到了异常抛出。
这是我调用的代码:
val responseBody = webClient.get()
.uri("/some-uri")
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono { response -> handleResponse(response) }
.log()
.block()
响应处理程序如下所示:
fun handleResponse(response: ClientResponse): Mono<String> {
if (response.statusCode().is2xxSuccessful()) {
return response.bodyToMono(String::class.java)
}
else if (response.statusCode().is4xxClientError()) {
// Handle client errors (e.g., 404 Not Found)
return Mono.error(RuntimeException("Response not found"));
}
else if (response.statusCode().is5xxServerError()) {
// Handle server errors (e.g., 500 Internal Server Error)
return Mono.error(RuntimeException("Server error"));
}
else {
// Handle other status codes as needed
return Mono.error(RuntimeException("Unexpected error"));
}
}
如果我取出 block(),它可以工作,但我需要将字符串转换为具有不同类列表的自定义复杂对象。
你有两个选择。 第一个也是最好的一个是不阻止。
return webClient.get()
.uri("/some-uri")
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono { handleResponse(it) }
.map { doSthWithIt(it) }
.log()
在
doSthWithIt
内部,您可以获取响应中的内容,更新实体,通过kafka发送它,将字符串转换为带有不同类列表的自定义复杂对象,等等。重点是不要在这里阻塞。
第二个选项是阻止并使用
Schedulers.boundedElastic
(doc):
return Mono.fromCallable {
val response = webClient.get()
.uri("/some-uri")
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono { handleResponse(it) }
.block()
doSthWithIt(response)
}.subscribeOn(Schedulers.boundedElastic())
}
我主要使用java,所以可能会有一些小的语法不匹配,但你明白了。
解释几句:你会因为阻塞而得到异常。反应式库的重点是不阻塞,因此会检测到此类阻塞调用并发出异常信号。项目反应堆还集成了BlockHound,可以更广泛地防止阻塞操作。