我正在考虑实现一个无锁循环数组。一个问题是以无锁方式维护头指针和尾指针。我想到的代码是:
int circularIncrementAndGet(AtomicInteger i) {
i.compareAndSet(array.length - 1, -1);
return i.incrementAndGet();
}
然后我会做类似的事情:
void add(double value) {
int idx = circularIncrementAndGet(tail);
array[idx] = value;
}
(请注意,如果数组已满,旧值将被覆盖,我对此没意见)。
有人认为这个设计有问题吗?我怀疑可能存在我没有看到的竞争条件。
更简单的方法是使用 2 的幂并执行以下操作。
final double[] array;
final int sizeMask;
final AtomicInteger i = new AtomicInteger();
public CircularBuffer(int size) {
assert size > 1 && ((size & (size -1)) == 0); // test power of 2.
array = new double[size];
sizeMask = size -1;
}
void add(double value) {
array[i.getAndIncrement() & sizeMask] = value;
}
查看disruptor:http://lmax-exchange.github.io/disruptor/,它是Java中的开源无锁循环缓冲区。
是的,存在竞争条件。
说
i = array.length - 2
,两个线程进入circularIncrementAndGet()
:
Thread 1: i.compareAndSet(array.length - 1, -1) results in i = array.length - 2
Thread 2: i.compareAndSet(array.length - 1, -1) results in i = array.length - 2
Thread 1: i.incrementAndGet() results in i = array.length - 1
Thread 2: i.incrementAndGet() results in i = array.length
当线程 2 到达 ArrayIndexOutOfBoundsException
时,导致
array[idx] = value
(以及对 add()
的所有后续调用,直到 i
溢出)。
@Peter Lawrey 提出的解决方案不会遇到这个问题。
如果您坚持以下限制:
可以实现循环数组/队列。
入队线程拥有尾指针。出列线程拥有头指针。除了一个条件外,到目前为止,这两个线程不共享任何状态,因此没有问题。
这个条件是测试空或满。
认为空意味着 head == tail;考虑 full 表示 tail == head - 1 模数组大小。入队必须检查队列是否已满,出队必须检查队列是否为空。您需要浪费数组中的一个索引来检测满和空之间的差异 - 如果您排队到最后一个存储桶中,那么满将是
head == tail
而空将是 head == tail
现在你陷入了死锁 - 你认为你是同时又空又满,所以什么工作都完成不了。
在执行这些检查时,可能会在比较时更新一个值。然而,由于这两个值是单调递增的,因此不存在正确性问题:
这是我多年前在 Dobb 博士的实现中使用的设计,它对我很有帮助:
https://github.com/jianhong-li/LockFreeRingBuffer 这是我用java语言编写的LockFreeRingBuffer。
public class LockFreeRingBuffer<T> {
public static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LockFreeRingBuffer.class);
private final AtomicReferenceArray<T> buffer;
private final int bufferSize;
private final long bufferSizeMask;
private final AtomicLong writeIndex = new AtomicLong(0);
private final AtomicLong readIndex = new AtomicLong(0);
public LockFreeRingBuffer(int bufferSize) {
// Check if bufferSize is positive
if (bufferSize <= 1) {
throw new IllegalArgumentException("bufferSize must be positive");
}
// Check if bufferSize is power of 2
int zCnt = 0;
int _bufferSize = bufferSize;
while (_bufferSize > 0) {
if ((_bufferSize & 1) == 1) {
zCnt++;
}
if (zCnt > 1) {
throw new IllegalArgumentException("bufferSize must be power of 2");
}
_bufferSize = _bufferSize >> 1;
}
// Initialize buffer and bufferSize
this.buffer = new AtomicReferenceArray<>(bufferSize);
this.bufferSize = bufferSize;
this.bufferSizeMask = bufferSize - 1;
}
public int push(T value) {
// Ensure that the written data is valid
if (value == null) {
return -1;
}
long pWrite, pRead;
int loopCnt = 0;
for (; ; ) {
int _rIndex = makeIndex(pRead = readIndex.get());
int _wIndex = makeIndex(pWrite = writeIndex.get()); // push . _wIndex . Expect to read the latest version.
if (nextIndex(pWrite) == _rIndex) {
// buffer is full
return -2;
}
// Make sure that the current write pointer points to a NULL slot. That is, it can be written to. (Make sure that the take side has cleaned up the data
)
if (buffer.get(_wIndex) != null) {
if ((++loopCnt) > 16) {
logger.trace("TRACE: push data retry [01] - buffer[{}] is not null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
_wIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
Thread.yield();
}
continue;
}
// Update the pointer first, then write the value. Make sure the ownership is written correctly
if (writeIndex.compareAndSet(pWrite, pWrite + 1)) {
// Write value: Theoretically this position must be empty to write
if (buffer.compareAndSet(_wIndex, null, value)) {
// writeCnt.incrementAndGet();
return _wIndex;
}
// can not happen
throw new RuntimeException("state error");
}
}
}
public T pop() {
int loopCnt = 0;
long pRead, pWrite;
for (; ; ) {
// P_w == P_r , buffer is empty
int _rIndex = makeIndex(pRead = readIndex.get());
int _wIndex = makeIndex(pWrite = writeIndex.get());
if (_rIndex == _wIndex) {
// buffer is empty
return null;
}
T t = buffer.get(_rIndex); // There is no need to determine null here. However, it is a snapshot of pRead. So there might be a null situation.
if (t == null) {
if ((++loopCnt) > 16) {
logger.trace("TRACE: pop data retry [20] - buffer[{}] is null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
_rIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
Thread.yield();
}
continue;
}
/* ************************************************
* pWrite
* |
* v
* [] -> [] -> [] -> [] -> [] -> [] -> [] -> []
* ^
* |
* pRead
* ************************************************
* case: pRead = 1, pWrite = 1
* pWrite = pWrite + 1 = 2
* But we haven't had time to write the data yet. In this case, pRead = 1, pWrite = 2. The pRead location data is empty
.
* now, t==null will continue directly.
* after many loop,value at pRead effective finnaly. Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
* Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
*/
if (readIndex.compareAndSet(pRead, pRead + 1)) {
// pRead+1,
// Data indicating that the original pointer position can be safely manipulated. And the above is guaranteed to read a non-null value. That is, the competitive write with push is complete.
//
// set to null
boolean compareAndSet = buffer.compareAndSet(_rIndex, t, null);
// must be success if code has no bug
if (compareAndSet) {
// CAS success. t must be valid
return t;
}
logger.error("ERROR: pop_data_error - set to null failed, pRead: {} ({}) , pWrite: {} ({})readIndex:{} writeIndex:{}",
pRead, _rIndex, pWrite, _wIndex, readIndex.get(), writeIndex.get());
// can not happen
throw new RuntimeException("state error");
}
}
}
/**
* this function maybe inline by JIT.
*/
private int nextIndex(long currentIndex) {
return (int) ((currentIndex + 1) & bufferSizeMask);
}
/**
* this function maybe inline by JIT.
*/
private int makeIndex(long currentIndex) {
return (int) (currentIndex & bufferSizeMask);
}
// ======================== get / setter =======================
public long getReadCnt() {
return readIndex.get();
}
public long getWriteCnt() {
return writeIndex.get();
}
}