Java 流管道定义可以在评估之前准备好吗?

问题描述 投票:0回答:1

我正在尝试(但失败了)研究一些非常具体的东西。在代码中像

int[] nums;
// ...
String s = Arrays.stream(nums)
    .unordered()
    .parallel()
    .mapToObj(Integer::toString)
    .sorted(myComparator)
    .collect(Collectors.joining());

Java 文档

nums
描述为 ,将
sorted
之前的所有内容描述为 中间操作,将
collect
描述为终端操作

我想向流库传递尽可能多的管道定义,而不给出源代码。理想情况下,这将包括终端操作,但即使不包括,这也会是一个胜利。

目的是“二阶惰性”:不仅是流的通常评估惰性,而且是预先遍历管道定义,并可能在管道输入数据之前减少它们。这是为了减少频繁调用的流代码的启动成本,将定义移动到类上可重用的

static final
,并且能够将管道重用和组合为对象而不是调用函数。

选择 API 代码本身,在这个准备阶段,我想要像 IntPipeline.mapToObj:

这样的方法
private <U> Stream<U> mapToObj(IntFunction<? extends U> mapper, int opFlags) {
    return new ReferencePipeline.StatelessOp<>(this, StreamShape.INT_VALUE, opFlags) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
            return new Sink.ChainedInt<>(sink) {
                @Override
                public void accept(int t) {
                    downstream.accept(mapper.apply(t));
                }
            };
        }
    };
}
在有可用源之前

实际被执行(直接或间接)。至少在理论上这似乎是可能的,因为这些代码看起来都足够懒——它只启动依赖于某些标志的嵌入类,而不是流数据内容。但是,如果我需要直接使用这个

Pipeline
类来做我想做的事情,那么我似乎运气不好,因为它没有公开暴露。

在理想情况下,最终产品看起来像类上的一个

static final
对象,通过一两次构造后调用,接受整数数组或早期整数流(相当于直接在
Arrays.stream()
之后) )并生成一个字符串或后期流(相当于直接在
collect()
之前)。

nums
示例是一个玩具示例,显然此策略对于我见过的一些较大的 20 行以上流定义会有更多好处。

java java-stream
1个回答
0
投票

你写道:

这是为了减少频繁调用的流代码的启动成本,将定义移动到类上可重用的

static final
,并且能够将管道重用和组合为对象而不是调用函数。

这至少存在两个基本问题:

  1. A

    Stream
    不可重复使用。这与懒惰或数据绑定无关;这与数据绑定无关。 Stream API 就是这样设计的。您只能使用 Stream 管道一次。

  2. 您的想法是假设正在进行大量值得优化的准备工作。我在参考实现的源代码中看不到任何支持这个想法的内容。但如果有疑问,请使用公正的分析工具来衡量实际工作并确定时间花在哪里。
    如果准备工作占用了整个执行时间的很大一部分,通常表明您没有足够的数据值得使用并行流。

但是要从字面上解决您的想法,您可以使用以下代码:

public class PreparedStream {
    final Stream<String> prepared;
    int[] actualData;

    public PreparedStream(){
        this(Comparator.naturalOrder());
    }

    public PreparedStream(Comparator<? super String> myComparator) {
        // already implies .unordered() .parallel() ──────────┬──┐
        prepared = StreamSupport.intStream(this::spliterator, 0, true)
            .mapToObj(Integer::toString)
            .sorted(myComparator);
    }

    public String execute(int[] nums) {
        actualData = nums;
        return prepared.collect(Collectors.joining());
    }

    private Spliterator.OfInt spliterator() {
        return Spliterators.spliterator(actualData, 0);
    }
}

例如

PreparedStream s = new PreparedStream();
String result = s.execute(new int[] { 9, 8, 7, 3, 1, 2 });

在没有提供实际数据的情况下构建 Stream 管道。然而,如前所述,这并没有改变 Stream 管道是一次性使用的事实,因此,

PreparedStream
的实例也只能使用一次。

此外,在调用

execute
之前准备好的流管道基本上是一个链表。没有进行其他准备工作,因为只有在终端操作开始时才知道的最终状态是最相关的。例如,您可以根据需要多次在并行和顺序之间切换,在终端操作中激活的最后一个状态将决定整体操作模式。另外,终端操作是否短路也是影响最大的因素之一。

您可以更进一步,在单独的线程中执行终端操作,在查询 spliterator 并且终端操作的一些准备工作已经完成时停止它,以等待提供实际的开始调用数据。

public class PreparedTerminalOperation {
    final SynchronousQueue<int[]> actualData = new SynchronousQueue<>();
    final CompletableFuture<String> prepared;

    public PreparedTerminalOperation(){
        this(Comparator.naturalOrder());
    }

    public PreparedTerminalOperation(Comparator<? super String> myComparator) {
        prepared = CompletableFuture.supplyAsync(() ->
            StreamSupport.intStream(this::spliterator, 0, true)
                .mapToObj(Integer::toString)
                .sorted(myComparator)
                .collect(Collectors.joining()),
            Executors.newVirtualThreadPerTaskExecutor());
    }

    public String execute(int[] nums) {
        try {
            actualData.put(nums);
            return prepared.join();
        } catch(InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    private Spliterator.OfInt spliterator() {
        try {
            return Spliterators.spliterator(actualData.take(), 0);
        } catch(InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
}

还是一样的用法

PreparedTerminalOperation s = new PreparedTerminalOperation();
String result = s.execute(new int[] { 9, 8, 7, 3, 1, 2 });

但即使是现在,在

execute
电话会议之前转移的工作时间也不太可能出现差异。如前所述,我认为 Stream 实现中没有任何可以产生影响的工作。即使该对象是可重用的(但现在仍然不能),也不会有显着的节省。

© www.soinside.com 2019 - 2024. All rights reserved.