我正在尝试(但失败了)研究一些非常具体的东西。在代码中像
int[] nums;
// ...
String s = Arrays.stream(nums)
.unordered()
.parallel()
.mapToObj(Integer::toString)
.sorted(myComparator)
.collect(Collectors.joining());
nums
描述为 源,将 sorted
之前的所有内容描述为 中间操作,将 collect
描述为终端操作。
我想向流库传递尽可能多的管道定义,而不给出源代码。理想情况下,这将包括终端操作,但即使不包括,这也会是一个胜利。
目的是“二阶惰性”:不仅是流的通常评估惰性,而且是预先遍历管道定义,并可能在管道输入数据之前减少它们。这是为了减少频繁调用的流代码的启动成本,将定义移动到类上可重用的
static final
,并且能够将管道重用和组合为对象而不是调用函数。
选择 API 代码本身,在这个准备阶段,我想要像 IntPipeline.mapToObj:
这样的方法private <U> Stream<U> mapToObj(IntFunction<? extends U> mapper, int opFlags) {
return new ReferencePipeline.StatelessOp<>(this, StreamShape.INT_VALUE, opFlags) {
@Override
Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
return new Sink.ChainedInt<>(sink) {
@Override
public void accept(int t) {
downstream.accept(mapper.apply(t));
}
};
}
};
}
在有可用源之前实际被执行(直接或间接)。至少在理论上这似乎是可能的,因为这些代码看起来都足够懒——它只启动依赖于某些标志的嵌入类,而不是流数据内容。但是,如果我需要直接使用这个
Pipeline
类来做我想做的事情,那么我似乎运气不好,因为它没有公开暴露。
在理想情况下,最终产品看起来像类上的一个
static final
对象,通过一两次构造后调用,接受整数数组或早期整数流(相当于直接在 Arrays.stream()
之后) )并生成一个字符串或后期流(相当于直接在 collect()
之前)。
nums
示例是一个玩具示例,显然此策略对于我见过的一些较大的 20 行以上流定义会有更多好处。
你写道:
这是为了减少频繁调用的流代码的启动成本,将定义移动到类上可重用的
,并且能够将管道重用和组合为对象而不是调用函数。static final
这至少存在两个基本问题:
A
Stream
不可重复使用。这与懒惰或数据绑定无关;这与数据绑定无关。 Stream API 就是这样设计的。您只能使用 Stream 管道一次。
您的想法是假设正在进行大量值得优化的准备工作。我在参考实现的源代码中看不到任何支持这个想法的内容。但如果有疑问,请使用公正的分析工具来衡量实际工作并确定时间花在哪里。
如果准备工作占用了整个执行时间的很大一部分,通常表明您没有足够的数据值得使用并行流。
但是要从字面上解决您的想法,您可以使用以下代码:
public class PreparedStream {
final Stream<String> prepared;
int[] actualData;
public PreparedStream(){
this(Comparator.naturalOrder());
}
public PreparedStream(Comparator<? super String> myComparator) {
// already implies .unordered() .parallel() ──────────┬──┐
prepared = StreamSupport.intStream(this::spliterator, 0, true)
.mapToObj(Integer::toString)
.sorted(myComparator);
}
public String execute(int[] nums) {
actualData = nums;
return prepared.collect(Collectors.joining());
}
private Spliterator.OfInt spliterator() {
return Spliterators.spliterator(actualData, 0);
}
}
例如
PreparedStream s = new PreparedStream();
String result = s.execute(new int[] { 9, 8, 7, 3, 1, 2 });
在没有提供实际数据的情况下构建 Stream 管道。然而,如前所述,这并没有改变 Stream 管道是一次性使用的事实,因此,
PreparedStream
的实例也只能使用一次。
此外,在调用
execute
之前准备好的流管道基本上是一个链表。没有进行其他准备工作,因为只有在终端操作开始时才知道的最终状态是最相关的。例如,您可以根据需要多次在并行和顺序之间切换,在终端操作中激活的最后一个状态将决定整体操作模式。另外,终端操作是否短路也是影响最大的因素之一。
您可以更进一步,在单独的线程中执行终端操作,在查询 spliterator 并且终端操作的一些准备工作已经完成时停止它,以等待提供实际的开始调用数据。
public class PreparedTerminalOperation {
final SynchronousQueue<int[]> actualData = new SynchronousQueue<>();
final CompletableFuture<String> prepared;
public PreparedTerminalOperation(){
this(Comparator.naturalOrder());
}
public PreparedTerminalOperation(Comparator<? super String> myComparator) {
prepared = CompletableFuture.supplyAsync(() ->
StreamSupport.intStream(this::spliterator, 0, true)
.mapToObj(Integer::toString)
.sorted(myComparator)
.collect(Collectors.joining()),
Executors.newVirtualThreadPerTaskExecutor());
}
public String execute(int[] nums) {
try {
actualData.put(nums);
return prepared.join();
} catch(InterruptedException e) {
throw new IllegalStateException(e);
}
}
private Spliterator.OfInt spliterator() {
try {
return Spliterators.spliterator(actualData.take(), 0);
} catch(InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
还是一样的用法
PreparedTerminalOperation s = new PreparedTerminalOperation();
String result = s.execute(new int[] { 9, 8, 7, 3, 1, 2 });
但即使是现在,在
execute
电话会议之前转移的工作时间也不太可能出现差异。如前所述,我认为 Stream 实现中没有任何可以产生影响的工作。即使该对象是可重用的(但现在仍然不能),也不会有显着的节省。