最近我遇到了这个问题:有3个消费者线程需要实现一个无锁队列(不能使用同步),这样就不会阻塞消费线程。假设队列已包含数据。
我想了一会儿,遇到了原子操作,如果仔细使用可以提供帮助。我的实现如下所示。由于数据已经在队列中,我没有实现enqueue
方法并在构造函数中填充数组。
public class SPMCQueue {
private AtomicInteger index = new AtomicInteger(0);
public int[] arr;
public SPMCQueue(int size) {
arr = IntStream.range(0, size).toArray();
}
public Integer dequeue() {
Integer ret = null;
int x = index.getAndIncrement();
if (x < arr.length) {
ret = arr[x];
System.out.println(arr[x] + " by " + Thread.currentThread().getName());
}
else {
throw new RuntimeException("Queue is empty");
}
return ret;
}
}
class QueueTest {
public static void main(String[] args) {
SPMCQueueq = new SPMCQueue(40);
Runnable t1 = () -> {
try {
while (true) {
q.dequeue();
}
}catch(Exception e) {
}
};
Runnable t2 = () -> {
try {
while(true) { q.dequeue(); }
}catch(Exception e) {
}
};
Runnable r3 = () -> {
try {
while(true) { q.dequeue(); }
} catch (Exception e) {
// TODO Auto-generated catch block
//e.printStackTrace();
}
};
Thread thread1 = new Thread(t1);
Thread thread2 = new Thread(t2);
Thread thread3 = new Thread(r3);
thread1.start();
thread2.start();
thread3.start();
}
}
我已经执行了上面的程序,结果显示所有3个消费者都在消耗数据,虽然乱序,有些线程消耗的数据比其他线程多,但我没有看到任何数据在o中多次出现/ p。
我有以下问题:
我想回答一下我的回答:https://stackoverflow.com/a/21101904/7340499,因为它与你的问题类似。
那么,你的问题:
但我没有看到任何数据在o / p中出现多次。
这是预料之中的。因为getAndIncrement()是原子的,并且无论何时访问该函数,您将获得不同的x值,因此输出不同。但是,由于在单个非原子出列操作中组合了“getAndIncrement()”和“System.out.print()”函数,您的输出有时可能会出现故障,例如:你在一个线程上获得x = 1,另一个线程中断,得到x = 2并打印它,然后你的初始线程完成打印。我相信这也指出了你的实施问题,如(1)中所述。您的应用程序是否可以正常处理队列?
实现无锁消费者队列的其他方法有哪些?
嗯,正如你所注意到的那样,原子操作是一种方式。但实质上,它们非常像锁,无论如何。它们只是在较小的规模上运行(但是存在一些实现差异)。所以很难将它们概念化为不同的方法,至少对我自己而言。除此之外,我相信这里有一些不错的资源:Lock Free Queue -- Single Producer, Multiple Consumers