如何从 WebFlux 中的 Mono<List<T>> 中提取内容并将其传递到调用链中

问题描述 投票:0回答:2

我希望能够从

List<Payload>
中提取
Mono<List<Payload>>
,将其传递给下游服务进行处理(或者可能从
read(RequestParams params)
方法返回,而不是返回
void
):

@PostMapping("/subset")
    public void read(@RequestBody RequestParams params){
        Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
        
        ....
    }

其中

reader.read(...)
是自动装配 Spring 服务上的一个方法,利用 webClient 从外部 Web 服务 API 获取数据:

public Mono<List<Payload>> read(String date, String assetClasses, String firmAccounts, String id, String password) {
        Flux<Payload> nodes = client
                .get()
                .uri(uriBuilder -> uriBuilder
                    .path("/api/subset")
                    .queryParam("payloads", true)
                    .queryParam("date", date)
                    .queryParam("assetClasses", assetClasses)
                    .queryParam("firmAccounts", firmAccounts)
                    .build())
                .headers(header -> header.setBasicAuth("abc123", "XXXXXXX"))
        .retrieve()
        .onStatus(HttpStatus::is4xxClientError, response -> {
            System.out.println("4xx error");
            return Mono.error(new RuntimeException("4xx"));
        })
        .onStatus(HttpStatus::is5xxServerError, response -> {
            System.out.println("5xx error");
            return Mono.error(new RuntimeException("5xx"));
                })
        .bodyToFlux(Payload.class);
        
        Mono<List<Payload>> records = nodes
                .collectList();
        
        return records;
    }

在 WebFlux 中不允许进行阻塞

result.block()
并抛出异常:

new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread ..." ;

在 WebFlux 中提取 Mono 内容的正确方法是什么? 它是某种

subscribe()
吗? 语法是什么?

spring-boot spring-webflux project-reactor
2个回答
2
投票

没有“正确的方法”,这就是重点。为了获得你需要阻塞的值,而阻塞在 webflux 中是不好的,原因有很多(我现在不会讨论)。

您应该做的是将发布者一直返回给调用客户端。

许多人通常很难理解的一件事是,webflux 与生产者(

Mono
Flux
)和订阅者一起工作。

你的整个服务也是一个生产者,调用的客户端可以看作是订阅者。

将其视为一条长链,从数据源开始,最终在显示数据的客户端中结束。

一个简单的经验法则是,谁是数据的最终消费者,谁就是订阅者,其他人都是生产者。

因此,在您的情况下,您只需将

Mono<List<T>
返回给调用客户端即可。

@PostMapping("/subset")
public Mono<List<Payload>> read(@RequestBody RequestParams params){
    Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
        
    return result;
}

0
投票

虽然以下代码确实返回日志中 Mono 可观察值的值:

@PostMapping("/subset")
@ResponseBody
public  Mono<ResponseEntity<List<Payload>>> read1(@RequestBody RequestParams params){
        Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
                
        return result
                 .map(e -> new ResponseEntity<List<PayloadByStandardBasis>>(e, HttpStatus.OK));
        
    }

我所寻求的理解是使用 WebFlux 组成一系列调用的正确方法,其中一个操作员/分支的响应(由于网络客户端调用而具体化,产生一组记录,如上所述)可以向下游传递给另一个操作员/分支,以促进将这些记录保存在数据库中的副作用,或达到这种效果。

将每个步骤建模为单独的 REST 端点可能是一个好主意,然后为组合操作提供另一个端点,该端点以正确的顺序在内部调用每个独立端点,或者其他设计选择是否更优选?

这最终是我所寻求的理解,因此,如果有人想分享示例代码以及意见以更好地实现上述步骤集,我愿意接受最全面的答案。

谢谢你。

© www.soinside.com 2019 - 2024. All rights reserved.