跳过 Stream 中的最后 x 个元素<T>

问题描述 投票:0回答:3

如果我有

Stream<T>
,我可以轻松地使用
skip(long)
跳过流的前几个元素。然而,似乎没有相当于在流末尾跳过给定数量的元素的方法。

最明显的解决方案是使用

limit(originalLength - elementsToRemoveAtEnd)
,但这需要事先知道初始长度,但情况并非总是如此。

有没有办法删除未知长度流的最后几个元素,而不必将其收集到

Collection
中,计算元素并再次流式传输?

java java-8 java-stream
3个回答
11
投票

对于可能具有未知长度的

Stream
,没有通用的免存储解决方案。但是,您不需要收集整个流,您只需要与要跳过的元素数量一样大的存储空间:

static <T> Stream<T> skipLastElements(Stream<T> s, int count) {
    if(count<=0) {
      if(count==0) return s;
      throw new IllegalArgumentException(count+" < 0");
    }
    ArrayDeque<T> pending=new ArrayDeque<T>(count+1);
    Spliterator<T> src=s.spliterator();
    return StreamSupport.stream(new Spliterator<T>() {
        public boolean tryAdvance(Consumer<? super T> action) {
            while(pending.size()<=count && src.tryAdvance(pending::add));
            if(pending.size()>count) {
              action.accept(pending.remove());
              return true;
            }
          return false;
        }
        public Spliterator<T> trySplit() {
            return null;
        }
        public long estimateSize() {
            return src.estimateSize()-count;
        }
        public int characteristics() {
            return src.characteristics();
        }
    }, false);
}
public static void main(String[] args) {
    skipLastElements(Stream.of("foo", "bar", "baz", "hello", "world"), 2)
    .forEach(System.out::println);
}

1
投票

以下代码使用

ArrayDeque
来缓冲
n
元素,其中
n
是最后要跳过的元素数。 诀窍是使用
skip(n)
。 这会导致第一个
n
元素被添加到
ArrayDeque
中。 然后,一旦
n
元素被缓冲,流就会继续处理元素,但从
ArrayDeque
中弹出元素。 当到达流末尾时,最后的
n
元素会卡在
ArrayDeque
中并被丢弃。

ArrayDeque
不允许使用
null
元素。 下面的代码将
null
映射到
NULL_VALUE
,然后添加到
ArrayDeque
,然后从
NULL_VALUE
弹出后将
null
映射回
ArrayDeque

private static final Object NULL_VALUE = new Object();

public static <T> Stream<T> skipLast(Stream<T> input, int n)                   
{
   ArrayDeque<T> queue;

   if (n <= 0)
      return(input);

   queue = new ArrayDeque<>(n + 1);

   input = input.
      map(item -> item != null ? item : NULL_VALUE).
      peek(queue::add).
      skip(n).
      map(item -> queue.pop()).
      map(item -> item != NULL_VALUE ? item : null);

   return(input);
}

0
投票

这是使用即将推出的 Java 24 中的 Stream Gatherers 功能的解决方案:

List<T> result = myStream.gather(skipLast(5)).toList()
private static <T> Gatherer<T, ?, T> skipLast(int n) {
    return Gatherer.<T, Queue<T>, T>ofSequential(
            ArrayDeque::new,
            Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                state.add(element);

                if (state.size() == n + 1) {
                    return downstream.push(state.remove());
                } else {
                    return true;
                }
            }));
}

此收集器使用队列作为其状态,将 n 元素保留在其中。 每当保留 n 元素后添加附加元素时,第一个元素就会从队列中删除并发送到流中。 最终结果是除了最后 n 元素之外的所有元素都包含在流中。

以一点复杂性为代价,可以将其增强为可以并行处理的版本(在并行流内):

private static <T> Gatherer<T, ?, T> skipLast(int n) {
    return Gatherer.<T, Queue<T>, T>of(
            ArrayDeque::new,
            Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                state.add(element);

                boolean downstreamResult = true;
                while (state.size() > n) {
                    downstreamResult &= downstream.push(state.remove());
                }
                return downstreamResult;
            }),
            (q1, q2) -> {
                q1.addAll(q2);
                return q1;
            },
            (state, downstream) -> {
                while (state.size() > n) {
                    downstream.push(state.remove());
                }
            });
}

当两个状态在此并行版本中组合时,生成的队列的大小可能大于 n。 这在积分器和整理器中都得到了考虑,它们保留处理元素,直到队列大小回到 n

© www.soinside.com 2019 - 2024. All rights reserved.