我有一个单消费者、多生产者无锁队列 (MPSCQueue) 与 std::counting_semaphore 相结合,以在新项目入队时通知消费者。消费者使用 dequeue() 尝试获取项目;如果成功,它会调用 sema.try_acquire() ,我希望每次队列中确实有一个项目时它都会成功。但是,在极少数情况下,即使队列返回了有效项目,我也会收到断言失败,表明 try_acquire() 返回 false。
我可以确认确实只有一个消费者线程。没有其他地方调用 dequeue()。该代码使用 std::memory_order_seq_cst 进行队列中的原子操作,并使用 sema.release()/sema.acquire() 进行信号量。
这似乎是一个内存排序或可见性问题,消费者在看到信号量上相应的release()操作之前先看到队列中新链接的节点。但我认为 seq_cst 加上信号量释放/获取足以保证正确的排序。关于如何确保我们永远不会在队列中看到有效项目但 try_acquire() 仍然失败的任何想法或说明?
#include <atomic>
#include <cassert>
#include <iostream>
#include <optional>
#include <semaphore>
template <typename T>
class MPSCQueue {
struct Node {
T data;
std::atomic<Node*> next;
// Default constructor for the dummy node
Node() : next(nullptr) {}
// Constructor that moves the data in
Node(T data_) : data(std::move(data_)), next(nullptr) {}
};
// Atomic head pointer for multiple producers
std::atomic<Node*> head;
// Tail pointer for the single consumer
Node* tail;
public:
std::atomic_size_t enqueue_count = 0;
size_t dequeue_count = 0;
MPSCQueue() {
Node* dummy = new Node();
head.store(dummy, std::memory_order_seq_cst);
tail = dummy;
}
~MPSCQueue() {
Node* node = tail;
while (node) {
Node* next = node->next.load(std::memory_order_seq_cst);
delete node;
node = next;
}
}
// Called by producers
void enqueue(T data) {
enqueue_count.fetch_add(1);
Node* node = new Node(std::move(data));
// Swap in the new node as the head
Node* prev_head = head.exchange(node, std::memory_order_seq_cst);
// Link the old head to the new node
prev_head->next.store(node, std::memory_order_seq_cst);
}
// Called by the single consumer
std::optional<T> dequeue() {
// Check the next pointer of the tail
Node* next = tail->next.load(std::memory_order_seq_cst);
if (next) {
// Move the data out
T res = std::move(next->data);
delete tail;
tail = next;
dequeue_count += 1;
return res;
}
return std::nullopt;
}
size_t size() { return enqueue_count.load() - dequeue_count; }
};
template <typename T>
class MPSCQueueConsumerLock {
MPSCQueue<T> queue;
std::counting_semaphore<> sema{0};
public:
void enqueue(T data) {
queue.enqueue(std::move(data));
// Release the semaphore to notify the consumer
sema.release();
}
// Single consumer calls this
T dequeue() {
auto re = queue.dequeue();
if (re.has_value()) {
// We have an item, so we expect the semaphore count to be > 0
if (!sema.try_acquire()) {
// Unexpectedly fails in rare cases
std::cerr << __FILE__ << ":" << __FUNCTION__
<< " sema.try_acquire() should succeed, please check\n";
assert(false);
}
return re.value();
}
// Otherwise, block until something is available
sema.acquire();
return queue.dequeue().value();
}
size_t size() { return queue.size(); }
};
有时(概率很低,但仍有可能)代码会命中:
/path/to/mpsc.hpp:dequeue sema try_acquire should be success, please check
python: /path/to/mpsc.hpp:79: T MPSCQueueConsumerLock<T>::dequeue() [...]
Assertion `false' failed.
Aborted (core dumped)
我已经验证确实只有一个消费者线程,并且这是调用 dequeue() 的唯一地方。
怎么可能消费者可以看到 MPSCQueue 中的新节点,但没有观察到相应的 sema.release() 调用(因此 try_acquire() 失败)? std::counting_semaphore 或我缺少的内存顺序是否有微妙之处? 有什么建议可以确保消费者对队列的看法和信号量的计数保持一致吗?
任何指示或建议将不胜感激。谢谢!
您的支票中有竞争条件。
void enqueue(T data) {
queue.enqueue(std::move(data));
// thread suspended by OS here
sema.release();
}
T dequeue() {
auto re = queue.dequeue();
if (re.has_value()) {
// thread gets an item while the other thread is suspended
// semaphore not incremented yet !
if (!sema.try_acquire()) {
...
}
如果你有 2 个原子操作 A 和 B,并且 A 在 B 之前发生,如果你看到 B 那么你可以说 A 发生了,但是看到 A 发生并不能推断出 B 上的任何内容,你需要在出队之前递减信号量。