如果我有
Stream<T>
,我可以轻松地使用 skip(long)
跳过流的前几个元素。然而,似乎没有相当于在流末尾跳过给定数量的元素的方法。
最明显的解决方案是使用
limit(originalLength - elementsToRemoveAtEnd)
,但这需要事先知道初始长度,但情况并非总是如此。
有没有办法删除未知长度流的最后几个元素,而不必将其收集到
Collection
中,计算元素并再次流式传输?
对于可能具有未知长度的
Stream
,没有通用的免存储解决方案。但是,您不需要收集整个流,您只需要与要跳过的元素数量一样大的存储空间:
static <T> Stream<T> skipLastElements(Stream<T> s, int count) {
if(count<=0) {
if(count==0) return s;
throw new IllegalArgumentException(count+" < 0");
}
ArrayDeque<T> pending=new ArrayDeque<T>(count+1);
Spliterator<T> src=s.spliterator();
return StreamSupport.stream(new Spliterator<T>() {
public boolean tryAdvance(Consumer<? super T> action) {
while(pending.size()<=count && src.tryAdvance(pending::add));
if(pending.size()>count) {
action.accept(pending.remove());
return true;
}
return false;
}
public Spliterator<T> trySplit() {
return null;
}
public long estimateSize() {
return src.estimateSize()-count;
}
public int characteristics() {
return src.characteristics();
}
}, false);
}
public static void main(String[] args) {
skipLastElements(Stream.of("foo", "bar", "baz", "hello", "world"), 2)
.forEach(System.out::println);
}
以下代码使用
ArrayDeque
来缓冲 n
元素,其中 n
是最后要跳过的元素数。 诀窍是使用skip(n)
。 这会导致第一个 n
元素被添加到 ArrayDeque
中。 然后,一旦 n
元素被缓冲,流就会继续处理元素,但从 ArrayDeque
中弹出元素。 当到达流末尾时,最后的 n
元素会卡在 ArrayDeque
中并被丢弃。
ArrayDeque
不允许使用 null
元素。 下面的代码将 null
映射到 NULL_VALUE
,然后添加到 ArrayDeque
,然后从 NULL_VALUE
弹出后将 null
映射回 ArrayDeque
。
private static final Object NULL_VALUE = new Object();
public static <T> Stream<T> skipLast(Stream<T> input, int n)
{
ArrayDeque<T> queue;
if (n <= 0)
return(input);
queue = new ArrayDeque<>(n + 1);
input = input.
map(item -> item != null ? item : NULL_VALUE).
peek(queue::add).
skip(n).
map(item -> queue.pop()).
map(item -> item != NULL_VALUE ? item : null);
return(input);
}
这是使用即将推出的 Java 24 中的 Stream Gatherers 功能的解决方案:
List<T> result = myStream.gather(skipLast(5)).toList()
private static <T> Gatherer<T, ?, T> skipLast(int n) {
return Gatherer.<T, Queue<T>, T>ofSequential(
ArrayDeque::new,
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
state.add(element);
if (state.size() == n + 1) {
return downstream.push(state.remove());
} else {
return true;
}
}));
}
此收集器使用队列作为其状态,将 n 元素保留在其中。 每当保留 n 元素后添加附加元素时,第一个元素就会从队列中删除并发送到流中。 最终结果是除了最后 n 元素之外的所有元素都包含在流中。
以一点复杂性为代价,可以将其增强为可以并行处理的版本(在并行流内):
private static <T> Gatherer<T, ?, T> skipLast(int n) {
return Gatherer.<T, Queue<T>, T>of(
ArrayDeque::new,
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
state.add(element);
boolean downstreamResult = true;
while (state.size() > n) {
downstreamResult &= downstream.push(state.remove());
}
return downstreamResult;
}),
(q1, q2) -> {
q1.addAll(q2);
return q1;
},
(state, downstream) -> {
while (state.size() > n) {
downstream.push(state.remove());
}
});
}
当两个状态在此并行版本中组合时,生成的队列的大小可能大于 n。 这在积分器和整理器中都得到了考虑,它们保留处理元素,直到队列大小回到 n。