假设我们需要创建一个 Flux
根据内容 Closeable
资源.为了清楚起见,说有一个 BufferedReader
拟转为 Flux<String>
.
BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));
让我们假设 iteratorOfLines
产生一个有限的项目集。
我正在寻找一种方法来关闭 BufferedReader
当 Flux
已经消耗了它的所有数据,或者剩余的数据由于某种原因不需要了(例如订阅中止)。
有一个构造函数 reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose)
但是。
如何清理关闭资源后的正确方法是什么?Flux.fromIterable
发布最后一个项目?
也许,有一个更好的方法比 fromIterable
来做类似的事情,所以欢迎所有的选择。
对于一个等同于资源的尝试,你可以使用 using
Flux.using(
//Set up resource
() -> createReader("my_resource_path"),
//Create flux from resource
reader -> Flux.fromIterable(iteratorOfLines(reader)),
//Perform action (cleanup/close)
//when resource completes/errors/cancelled
reader -> {
try{
reader.close();
}catch(IOException e){
throw Exceptions.propagate(e);
}
}
);