我希望能够从
List<Payload>
中提取 Mono<List<Payload>>
,将其传递给下游服务进行处理(或者可能从 read(RequestParams params)
方法返回,而不是返回 void
):
@PostMapping("/subset")
public void read(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
....
}
其中
reader.read(...)
是自动装配 Spring 服务上的一个方法,利用 webClient 从外部 Web 服务 API 获取数据:
public Mono<List<Payload>> read(String date, String assetClasses, String firmAccounts, String id, String password) {
Flux<Payload> nodes = client
.get()
.uri(uriBuilder -> uriBuilder
.path("/api/subset")
.queryParam("payloads", true)
.queryParam("date", date)
.queryParam("assetClasses", assetClasses)
.queryParam("firmAccounts", firmAccounts)
.build())
.headers(header -> header.setBasicAuth("abc123", "XXXXXXX"))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> {
System.out.println("4xx error");
return Mono.error(new RuntimeException("4xx"));
})
.onStatus(HttpStatus::is5xxServerError, response -> {
System.out.println("5xx error");
return Mono.error(new RuntimeException("5xx"));
})
.bodyToFlux(Payload.class);
Mono<List<Payload>> records = nodes
.collectList();
return records;
}
在 WebFlux 中不允许进行阻塞
result.block()
并抛出异常:
new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread ..." ;
在 WebFlux 中提取 Mono 内容的正确方法是什么? 它是某种
subscribe()
吗? 语法是什么?
没有“正确的方法”,这就是重点。为了获得你需要阻塞的值,而阻塞在 webflux 中是不好的,原因有很多(我现在不会讨论)。
您应该做的是将发布者一直返回给调用客户端。
许多人通常很难理解的一件事是,webflux 与生产者(
Mono
或 Flux
)和订阅者一起工作。
你的整个服务也是一个生产者,调用的客户端可以看作是订阅者。
将其视为一条长链,从数据源开始,最终在显示数据的客户端中结束。
一个简单的经验法则是,谁是数据的最终消费者,谁就是订阅者,其他人都是生产者。
因此,在您的情况下,您只需将
Mono<List<T>
返回给调用客户端即可。
@PostMapping("/subset")
public Mono<List<Payload>> read(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
return result;
}
虽然以下代码确实返回日志中 Mono 可观察值的值:
@PostMapping("/subset")
@ResponseBody
public Mono<ResponseEntity<List<Payload>>> read1(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
return result
.map(e -> new ResponseEntity<List<PayloadByStandardBasis>>(e, HttpStatus.OK));
}
我所寻求的理解是使用 WebFlux 组成一系列调用的正确方法,其中一个操作员/分支的响应(由于网络客户端调用而具体化,产生一组记录,如上所述)可以向下游传递给另一个操作员/分支,以促进将这些记录保存在数据库中的副作用,或达到这种效果。
将每个步骤建模为单独的 REST 端点可能是一个好主意,然后为组合操作提供另一个端点,该端点以正确的顺序在内部调用每个独立端点,或者其他设计选择是否更优选?
这最终是我所寻求的理解,因此,如果有人想分享示例代码以及意见以更好地实现上述步骤集,我愿意接受最全面的答案。
谢谢你。