假设我们有一个队列
BlockingQueue<String> queue= new LinkedBlockingQueue<>();
其他一些线程将值放入其中,然后我们像这样读取它
while (true) {
String next = queue.take();
System.out.println("next message:" + next);
}
如何以流方式迭代此队列,同时保持与上述代码类似的语义。
这段代码只遍历当前队列状态:
queue.stream().forEach(e -> System.out.println(e));
我猜测了一些你的期望,但我想我有一个很好的预感。
队列的流,就像遍历队列一样,代表队列的“当前内容”。当迭代器或流到达队列尾部时,它不会阻塞等待添加更多元素。迭代器或流在该点耗尽并且计算终止。 如果您想要一个由队列的所有当前和未来元素组成的流,您可以执行以下操作:
Stream.generate(() -> {
try {
return queue.take();
} catch (InterruptedException ie) {
return "Interrupted!";
}
})
.filter(s -> s.endsWith("x"))
.forEach(System.out::println);
(不幸的是,需要处理
InterruptedException
使得这变得非常混乱。)
请注意,无法关闭队列,并且
Stream.generate
无法停止生成元素,因此这实际上是一个无限流。终止它的唯一方法是使用短路流操作,例如
findFirst
。public class QueueSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> queue;
private final long timeoutMs;
public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
this.queue = queue;
this.timeoutMs = timeoutMs;
}
@Override
public int characteristics() {
return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
try {
final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
if (next == null) {
return false;
}
action.accept(next);
return true;
} catch (final InterruptedException e) {
throw new SupplierErrorException("interrupted", e);
}
}
@Override
public Spliterator<T> trySplit() {
return null;
}
}
处理InterruptedException抛出的异常是RuntimeException的扩展。使用此类,可以通过以下方式构建流: StreamSupport.stream(new QueueSpliterator(...)) 并添加常用的流操作。
(我是该项目的开发人员)提供了一个 async.Queue,它允许您异步(且干净地)从队列中填充和消费。 例如
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
或者简单地(只要这是一个 com.aol.simple.react.async.Queue)
Queue<String> queue = new Queue<>();
然后在一个单独的线程中:
new Thread(() -> {
while (true) {
queue.add("New message " + System.currentTimeMillis());
}
}).start();
回到主线程,您的原始代码现在应该按预期工作(无限迭代添加到队列中的消息并将其打印出来)
queue.stream().forEach(e -> System.out.println(e));
队列和流可以在任何阶段通过 -关闭
queue.close();