创建Flux.fromIterable后如何运行onClose操作?

问题描述 投票:0回答:1

假设我们需要创建一个 Flux 根据内容 Closeable 资源.为了清楚起见,说有一个 BufferedReader 拟转为 Flux<String>.

BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));

让我们假设 iteratorOfLines 产生一个有限的项目集。

我正在寻找一种方法来关闭 BufferedReaderFlux 已经消耗了它的所有数据,或者剩余的数据由于某种原因不需要了(例如订阅中止)。

有一个构造函数 reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose)但是。

  1. 似乎无法从反应堆的公共API中获取(甚至是过境)。
  2. 我怀疑它是否有帮助,因为它不包括Flux在得到iterable中最后一个项目之前停止的情况。

如何清理关闭资源后的正确方法是什么?Flux.fromIterable 发布最后一个项目?

也许,有一个更好的方法比 fromIterable 来做类似的事情,所以欢迎所有的选择。

java project-reactor flux reactor autocloseable
1个回答
2
投票

对于一个等同于资源的尝试,你可以使用 using

    Flux.using(
            //Set up resource
            () -> createReader("my_resource_path"),
            //Create flux from resource
            reader -> Flux.fromIterable(iteratorOfLines(reader)),
            //Perform action (cleanup/close) 
            //when resource completes/errors/cancelled
            reader -> {
                try{
                    reader.close();
                }catch(IOException e){
                    throw Exceptions.propagate(e);
                }
            }
    );
© www.soinside.com 2019 - 2024. All rights reserved.