我有类似这样的实用方法。
public static WebClient.ResponseSpec retrieve(final String baseUrl, final Duration responseTimeout) {
// ...
}
public static <T> Flux<T> retrieveBodyToFlux(final String baseUrl, final Duration responseTimeout,
final Class<T> elementClass) {
return retrieve(baseUrl, responseTimeout)
.bodyToFlux(elementClass);
}
public static Mono<Void> download(final String baseUrl, final Duration responseTimeout,
final Path destination, final OpenOption... openOptions) {
return DataBufferUtils.write(
retrieveBodyToFlux(baseUrl, responseTimeout, DataBuffer.class),
destination,
openOptions
);
}
现在我想添加另一种方法让客户端使用临时文件。
// Let me download the file, and you can just cononsume the file!
public static Mono<Void> download(final String baseUrl, final Duration responseTimeout,
final Consumer<? super Path> consumer) {
// Create a temp file
// download the URL to the file
// accept the file to the consumer; possibly blocking IO operations
// don't forget to delete the file!
}
我怎样才能在没有任何阻塞呼叫警告的情况下完成这项工作?
我尝试过(并且似乎有效),但我想知道我在做什么。它是最优的吗?我还有其他办法吗?
return Mono.usingWhen(
Mono.fromCallable(() -> Files.createTempFile(null, null)).subscribeOn(Schedulers.boundedElastic()),
p -> download(baseUrl, responseTimeout, p, StandardOpenOption.WRITE)
.then(Mono.just(p))
.doOnNext(consumer).subscribeOn(Schedulers.boundedElastic()),
p -> Mono.fromCallable(() -> {
Files.delete(p);
return null;
}).publishOn(Schedulers.boundedElastic()).then()
)
.then();
Mono.usingWhen(
// Resource acquisition
Mono.fromCallable(() -> Files.createTempFile(null, null)).subscribeOn(Schedulers.boundedElastic()),
// Resource usage
tempFile -> download(baseUrl, responseTimeout, tempFile, StandardOpenOption.WRITE)
.then(Mono.fromRunnable(() -> consumer.accept(tempFile)).subscribeOn(Schedulers.boundedElastic())),
// Resource cleanup
tempFile -> Mono.fromRunnable(() -> {
try {
Files.delete(tempFile);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).subscribeOn(Schedulers.boundedElastic())
).then();