private fun buildWebClient(): WebClient {
val httpClient:CloseableHttpAsyncClient =
HttpAsyncClients.createDefault()
val connector = HttpComponentsClientHttpConnector(httpClient)
return WebClient.builder()
.baseUrl("https://example.com")
.clientConnector(connector)
.filter(Filter())
.build()
}
private fun Filter(): ExchangeFilterFunction {
return ExchangeFilterFunction.ofResponseProcessor { response ->
response.bodyToMono(String::class.java).cache().flatMap { body ->
val res = json().decodeFromString<ResponseDTO>(body)
when {
res.isSuccess() -> {
val bufferFactory = DefaultDataBufferFactory()
val cachedBody = bufferFactory.wrap(body.toByteArray())
Mono.just(response.mutate().body(Flux.just(cachedBody)).build())
}
else -> {
throw IllegalArgumentException("error")
}
}
}
}
}
如何修复错误?
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: The client response body can only be consumed once Caused by: java.lang.IllegalStateException: The client response body can only be consumed once at org.springframework.http.client.reactive.AbstractClientHttpResponse.lambda$singleSubscription$0(AbstractClientHttpResponse.java:65) \~\[spring-web-6.1.5.jar:6.1.5\] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s):
我找不到有关修复过滤器的信息... 请帮助我
此问题仅在使用异步客户端时出现
我想修复错误 这对我来说太难了
也许你可以试试这个:
private fun Filter(): ExchangeFilterFunction {
return ExchangeFilterFunction.ofResponseProcessor { response ->
response.bodyToMono(String::class.java).flatMap { body ->
val res = json().decodeFromString<ResponseDTO>(body)
when {
res.isSuccess() -> {
val bufferFactory = DefaultDataBufferFactory()
val cachedBody = bufferFactory.wrap(body.toByteArray())
val modifiedResponse = response.mutate().body(Flux.just(cachedBody)).build()
Mono.just(modifiedResponse)
}
else -> {
throw IllegalArgumentException("error")
}
}
}
}
}
在更新后的代码中,使用 flatMap 运算符而不是缓存来缓存响应正文。然后,使用缓存的正文创建一个新的 modifiedResponse 对象并返回它。