出队项目存在,但 std::counting_semaphore::try_acquire() 在单消费者 MPSC 队列中失败

问题描述 投票:0回答:1

我有一个单消费者、多生产者无锁队列 (MPSCQueue) 与 std::counting_semaphore 相结合,以在新项目入队时通知消费者。消费者使用 dequeue() 尝试获取项目;如果成功,它会调用 sema.try_acquire() ,我希望每次队列中确实有一个项目时它都会成功。但是,在极少数情况下,即使队列返回了有效项目,我也会收到断言失败,表明 try_acquire() 返回 false。

我可以确认确实只有一个消费者线程。没有其他地方调用 dequeue()。该代码使用 std::memory_order_seq_cst 进行队列中的原子操作,并使用 sema.release()/sema.acquire() 进行信号量。

为什么会发生这种情况?

这似乎是一个内存排序或可见性问题,消费者在看到信号量上相应的release()操作之前先看到队列中新链接的节点。但我认为 seq_cst 加上信号量释放/获取足以保证正确的排序。关于如何确保我们永远不会在队列中看到有效项目但 try_acquire() 仍然失败的任何想法或说明?

#include <atomic>
#include <cassert>
#include <iostream>
#include <optional>
#include <semaphore>

template <typename T>
class MPSCQueue {
  struct Node {
    T data;
    std::atomic<Node*> next;

    // Default constructor for the dummy node
    Node() : next(nullptr) {}
    // Constructor that moves the data in
    Node(T data_) : data(std::move(data_)), next(nullptr) {}
  };

  // Atomic head pointer for multiple producers
  std::atomic<Node*> head;
  // Tail pointer for the single consumer
  Node* tail;

 public:
  std::atomic_size_t enqueue_count = 0;
  size_t dequeue_count = 0;

  MPSCQueue() {
    Node* dummy = new Node();
    head.store(dummy, std::memory_order_seq_cst);
    tail = dummy;
  }

  ~MPSCQueue() {
    Node* node = tail;
    while (node) {
      Node* next = node->next.load(std::memory_order_seq_cst);
      delete node;
      node = next;
    }
  }

  // Called by producers
  void enqueue(T data) {
    enqueue_count.fetch_add(1);
    Node* node = new Node(std::move(data));
    // Swap in the new node as the head
    Node* prev_head = head.exchange(node, std::memory_order_seq_cst);
    // Link the old head to the new node
    prev_head->next.store(node, std::memory_order_seq_cst);
  }

  // Called by the single consumer
  std::optional<T> dequeue() {
    // Check the next pointer of the tail
    Node* next = tail->next.load(std::memory_order_seq_cst);
    if (next) {
      // Move the data out
      T res = std::move(next->data);
      delete tail;
      tail = next;
      dequeue_count += 1;
      return res;
    }
    return std::nullopt;
  }

  size_t size() { return enqueue_count.load() - dequeue_count; }
};

template <typename T>
class MPSCQueueConsumerLock {
  MPSCQueue<T> queue;
  std::counting_semaphore<> sema{0};

 public:
  void enqueue(T data) {
    queue.enqueue(std::move(data));
    // Release the semaphore to notify the consumer
    sema.release();
  }

  // Single consumer calls this
  T dequeue() {
    auto re = queue.dequeue();
    if (re.has_value()) {
      // We have an item, so we expect the semaphore count to be > 0
      if (!sema.try_acquire()) {
        // Unexpectedly fails in rare cases
        std::cerr << __FILE__ << ":" << __FUNCTION__
                  << " sema.try_acquire() should succeed, please check\n";
        assert(false);
      }
      return re.value();
    }
    // Otherwise, block until something is available
    sema.acquire();
    return queue.dequeue().value();
  }

  size_t size() { return queue.size(); }
};

症状

有时(概率很低,但仍有可能)代码会命中:

/path/to/mpsc.hpp:dequeue sema try_acquire should be success, please check
python: /path/to/mpsc.hpp:79: T MPSCQueueConsumerLock<T>::dequeue() [...]
Assertion `false' failed.
Aborted (core dumped)

我已经验证确实只有一个消费者线程,并且这是调用 dequeue() 的唯一地方。

问题:

怎么可能消费者可以看到 MPSCQueue 中的新节点,但没有观察到相应的 sema.release() 调用(因此 try_acquire() 失败)? std::counting_semaphore 或我缺少的内存顺序是否有微妙之处? 有什么建议可以确保消费者对队列的看法和信号量的计数保持一致吗?

额外

  • 我使用 std::memory_order_seq_cst 来执行队列中的所有原子操作。
  • 消费者是严格单线程的;没有其他线程调用 dequeue()。
  • exchange(..., seq_cst) 和 sema.release() 之间是否存在隐藏的内存栅栏问题?

任何指示或建议将不胜感激。谢谢!

c++ memory concurrency memory-barriers
1个回答
0
投票

您的支票中有竞争条件。

void enqueue(T data) {
    queue.enqueue(std::move(data));
    // thread suspended by OS here
    sema.release();
  }

T dequeue() {
    auto re = queue.dequeue();
    if (re.has_value()) {
      // thread gets an item while the other thread is suspended
      // semaphore not incremented yet !
      if (!sema.try_acquire()) {
        ...
  }

如果你有 2 个原子操作 A 和 B,并且 A 在 B 之前发生,如果你看到 B 那么你可以说 A 发生了,但是看到 A 发生并不能推断出 B 上的任何内容,你需要在出队之前递减信号量。

© www.soinside.com 2019 - 2024. All rights reserved.