如何在Mono上调用阻塞IO调用

问题描述 投票:0回答:1

我有类似这样的实用方法。

    public static WebClient.ResponseSpec retrieve(final String baseUrl, final Duration responseTimeout) {
        // ...
    }

    public static <T> Flux<T> retrieveBodyToFlux(final String baseUrl, final Duration responseTimeout,
                                                 final Class<T> elementClass) {
        return retrieve(baseUrl, responseTimeout)
                .bodyToFlux(elementClass);
    }

    public static Mono<Void> download(final String baseUrl, final Duration responseTimeout,
                                      final Path destination, final OpenOption... openOptions) {
        return DataBufferUtils.write(
                retrieveBodyToFlux(baseUrl, responseTimeout, DataBuffer.class),
                destination,
                openOptions
        );
    }

现在我想添加另一种方法让客户端使用临时文件。

    // Let me download the file, and you can just cononsume the file!
    public static Mono<Void> download(final String baseUrl, final Duration responseTimeout,
                                      final Consumer<? super Path> consumer) {

        // Create a temp file
        // download the URL to the file
        // accept the file to the consumer; possibly blocking IO operations
        // don't forget to delete the file!
    }

我怎样才能在没有任何阻塞呼叫警告的情况下完成这项工作?

我尝试过(并且似乎有效),但我想知道我在做什么。它是最优的吗?我还有其他办法吗?

        return Mono.usingWhen(
                        Mono.fromCallable(() -> Files.createTempFile(null, null)).subscribeOn(Schedulers.boundedElastic()),
                        p -> download(baseUrl, responseTimeout, p, StandardOpenOption.WRITE)
                                .then(Mono.just(p))
                                .doOnNext(consumer).subscribeOn(Schedulers.boundedElastic()),
                        p -> Mono.fromCallable(() -> {
                            Files.delete(p);
                            return null;
                        }).publishOn(Schedulers.boundedElastic()).then()
                )
                .then();
spring-webflux project-reactor
1个回答
0
投票

Mono.usingWhen(
        // Resource acquisition
        Mono.fromCallable(() -> Files.createTempFile(null, null)).subscribeOn(Schedulers.boundedElastic()),
        
        // Resource usage
        tempFile -> download(baseUrl, responseTimeout, tempFile, StandardOpenOption.WRITE)
                    .then(Mono.fromRunnable(() -> consumer.accept(tempFile)).subscribeOn(Schedulers.boundedElastic())),
        
        // Resource cleanup
        tempFile -> Mono.fromRunnable(() -> {
            try {
                Files.delete(tempFile);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).subscribeOn(Schedulers.boundedElastic())
).then();

© www.soinside.com 2019 - 2024. All rights reserved.