Stream API 和队列:订阅 BlockingQueue 流式

问题描述 投票:0回答:3

假设我们有一个队列

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

其他一些线程将值放入其中,然后我们像这样读取它

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

如何以流方式迭代此队列,同时保持与上述代码类似的语义。

这段代码只遍历当前队列状态:

queue.stream().forEach(e -> System.out.println(e));
java java-8 java-stream queue
3个回答
39
投票

我猜测了一些你的期望,但我想我有一个很好的预感。

队列的流,就像遍历队列一样,代表队列的“当前内容”。当迭代器或流到达队列尾部时,它不会阻塞等待添加更多元素。迭代器或流在该点耗尽并且计算终止。 如果您想要一个由队列的所有当前和未来元素组成的流,您可以执行以下操作:

Stream.generate(() -> { try { return queue.take(); } catch (InterruptedException ie) { return "Interrupted!"; } }) .filter(s -> s.endsWith("x")) .forEach(System.out::println);

(不幸的是,需要处理
InterruptedException

使得这变得非常混乱。)


请注意,无法关闭队列,并且

Stream.generate

无法停止生成元素,因此这实际上是一个无限流。终止它的唯一方法是使用短路流操作,例如

findFirst
    


19
投票

public class QueueSpliterator<T> implements Spliterator<T> { private final BlockingQueue<T> queue; private final long timeoutMs; public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) { this.queue = queue; this.timeoutMs = timeoutMs; } @Override public int characteristics() { return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED; } @Override public long estimateSize() { return Long.MAX_VALUE; } @Override public boolean tryAdvance(final Consumer<? super T> action) { try { final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS); if (next == null) { return false; } action.accept(next); return true; } catch (final InterruptedException e) { throw new SupplierErrorException("interrupted", e); } } @Override public Spliterator<T> trySplit() { return null; } }

处理InterruptedException抛出的异常是RuntimeException的扩展。使用此类,可以通过以下方式构建流:
    StreamSupport.stream(new QueueSpliterator(...))
并添加常用的流操作。


15
投票
cyclops-react

(我是该项目的开发人员)提供了一个 async.Queue,它允许您异步(且干净地)从队列中填充和消费。 例如

Queue<String> queue = QueueFactories.<String>unboundedQueue().build();

或者简单地(只要这是一个 com.aol.simple.react.async.Queue)

Queue<String> queue = new Queue<>();

然后在一个单独的线程中:

new Thread(() -> { while (true) { queue.add("New message " + System.currentTimeMillis()); } }).start();

回到主线程,您的原始代码现在应该按预期工作(无限迭代添加到队列中的消息并将其打印出来)

queue.stream().forEach(e -> System.out.println(e));

队列和流可以在任何阶段通过 -
关闭

queue.close();

	
© www.soinside.com 2019 - 2024. All rights reserved.