在 Exchange() 之后调用 bodyToMono 时,block()/blockFirst()/blockLast() 会出现阻塞错误

问题描述 投票:0回答:6

我正在尝试使用 Webflux 将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,API 将返回成功,但使用 DTO 详细说明生成文件时的错误而不是文件本身。这是使用一个非常古老且设计不佳的 api,所以请原谅使用 post 和 api 设计。

API 调用 (exchange()) 的响应是 ClientResponse。从这里,我可以使用 bodyToMono 转换为 ByteArrayResource,它可以流式传输到文件,或者,如果创建文件时出现错误,那么我也可以使用 bodyToMono 转换为 DTO。但是,我似乎无法根据 ClientResponse 标头的内容执行或操作。

在运行时,我收到由

引起的 IllegalStateException

block()/blockFirst()/blockLast() 是阻塞的,线程reactor-http-client-epoll-12 不支持

我认为我的问题是我不能在同一个函数链中调用 block() 两次。

我的代码片段如下:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();

基本上我想根据标头中定义的 MediaType 以不同的方式处理 ClientResponse。

这可能吗?

java reactive-programming spring-webflux project-reactor
6个回答
61
投票

首先,一些内容将帮助您理解解决此用例的代码片段。

  1. 永远不应该在返回响应类型的方法中调用阻塞方法;您将阻塞应用程序的少数线程之一,这对应用程序非常不利
  2. 无论如何,从 Reactor 3.2 开始,反应式管道内的阻塞会引发错误
  3. 正如评论中所建议的,
  4. 致电
    subscribe
    也不是一个好主意。这或多或少就像在单独的线程中作为任务来启动该作业。完成后您将收到回调(可以为
    subscribe
    方法提供 lambda),但实际上您正在将当前管道与该任务解耦。在这种情况下,在您有机会读取完整响应正文并将其写入文件之前,可以关闭客户端 HTTP 响应并清理资源
  5. 如果你不想在内存中缓冲整个响应,Spring 提供了
    DataBuffer
    (想想可以池化的 ByteBuffer 实例)。
  6. 如果您正在实现的方法本身是阻塞的(例如返回
    void
    ),例如在测试用例中,您可以调用 block。

这是您可以用来执行此操作的代码片段:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

正如您所看到的,我们没有在任何地方阻塞,处理 I/O 的方法正在返回

Mono<Void>
,这相当于
done(error)
回调的反应式等价物,它在事情完成时以及是否发生错误时发出信号。

由于我不确定

createErrorFile
方法应该做什么,因此我提供了
createSpreadsheet
的示例,该示例仅将正文字节写入文件。请注意,由于数据缓冲区可能会被回收/池化,因此我们需要在完成后释放它们。

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}

通过此实现,您的应用程序将在给定时间在内存中保存一些

DataBuffer
实例(反应式运算符出于性能原因预取值),并以反应式方式写入字节。


42
投票

[更新2021/10/19]

toProcessor()
现已弃用。

考虑使用

myMono.toFuture().get();

正如投票最多的答案中所述,永远不应该阻止。就我而言,这是唯一的选择,因为我们在命令式代码中使用反应式库。可以通过将单声道包装在处理器中来完成阻塞:

myMono.toProcessor().block()

26
投票

要在服务器请求池之外执行客户端请求,请使用

myWebClientMono.share().block();


3
投票

尝试:

myMono.subscribeOn(Schedulers.boundedElastic()).toFuture().get(5L, TimeUnit.SECONDS)

2
投票

[更新2023/01/31]

我想对此主题进行补充并分享我的解决方案,因为自 5.3 版本以来

exchange()
运算符已弃用

详情:

已弃用。从 5.3 开始,由于可能存在内存泄漏和/或 连接;请使用exchangeToMono(Function), 交换到通量(函数);还可以考虑使用retrieve() 通过 ResponseEntity 提供对响应状态和标头的访问 以及错误状态处理。

因此,我将使用 retrieve() 运算符给出此任务的示例,并以某种方式简化将文件保存到文件系统以及 streaming 方法。

因为它让我们有机会访问标头和响应正文,所以我们可以这样做:

Mono<Void> fileWritten = webClient.get()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .retrieve()                         // using retrieve since exchange() is deprecated
        .toEntityFlux(DataBuffer.class)     // return Mono<ResponseEntity<Flux<DataBuffer>>>
        .flatMap(entity -> {
            // here we can access headers, body and etc. since we have ResponseEntity here
            if (MediaType.APPLICATION_JSON_VALUE.equals(entity.getHeaders().getContentType().toString())) {
                return createFile(entity.getBody(), "no_file_payload_response"); // save no payload body to a file
            } else {
                return createFile(entity.getBody(), "file"); // save file body to a file
            }
        });

fileWritten.subscribe(); // just for testing purposes, subscribe where you want depending on your requirements

将流

Publisher<DataBuffer>
保存到文件系统的方法:

private Mono<Void> createFile(Publisher<DataBuffer> body, String fileName) {
    Path path = Path.of("your_desired_path/" + fileName);
    return DataBufferUtils.write(body, path,
            StandardOpenOption.CREATE_NEW); // use OpenOption you want depending on your requirements
}

此外,如您所见,使用

DataBufferUtils.write()
我们可以直接将流写入文件

我们在这里不使用任何阻塞 API,例如 Input/OutputStream,因此我们当时不会在内存中缓冲文件的全部内容。


-11
投票
RestResultMessage message= createWebClient()
                .get()
                .uri(uri)
                .exchange()
                .map(clientResponse -> {
                    //delegation
                    ClientResponseWrapper wrapper = new 
                                 ClientResponseWrapper(clientResponse);
                    return Mono.just(wrapper);
                })
                .block() //wait until request is not done
                .map(result -> {  
                    //convert to any data
                    if (!result.statusCode().isError()){
                       //extract the result from request
                        return create(RestResultMessage.Result.success, result.bodyToMono(String.class).block());}
                    } else {
                        return create(RestResultMessage.Result.error, result.statusCode().name());
                    }
                })
                .block();
© www.soinside.com 2019 - 2024. All rights reserved.