如何在不使用无锁机制的情况下实现最大吞吐量?关于线程安全循环队列

问题描述 投票:0回答:1

我对 enqueueTS()dequeueTS() 使用两个互斥体,以便写入器和读取器线程可以同时运行,而无需互相等待。然而,enqueueTS()dequeueTS()都会影响structqueue_t中的成员numOfElements,最终导致竞争条件。

队列.c:

struct queue_t {
  unsigned head;
  unsigned tail;
  unsigned size;
  unsigned numOfElements;
  unsigned elementSize;
  char data[];
};

int enqueue(queue_t *me, const void *item) {
  if (!me || !item) {
    queue_error_callback("%s: invalid input\n", __func__);
    return -1;
  }

  if (me->numOfElements == me->size) {
    queue_error_callback("%s: queue is full\n", __func__);
    return -1;
  }

  memcpy(&me->data[me->tail * me->elementSize], item, me->elementSize);

  if (++me->tail == me->size) {
    me->tail = 0;
  }

  me->numOfElements++;

  return 0;
}

int dequeue(queue_t *me, void *item) {
  if (!me || !item) {
    queue_error_callback("%s: invalid input\n", __func__);
    return -1;
  }

  if (me->numOfElements == 0) {
    queue_error_callback("%s: queue is empty\n", __func__);
    return -1;
  }

  memcpy(item, &me->data[me->head * me->elementSize], me->elementSize);

  if (++me->head == me->size) {
    me->head = 0;
  }

  me->numOfElements--;

  return 0;
}

线程安全队列.c

struct thread_safe_queue {
  queue_t *queue;
  pthread_mutex_t enqueue_mutex;
  pthread_mutex_t dequeue_mutex;
  pthread_cond_t queue_not_full;
  pthread_cond_t queue_not_empty;
};

int enqueueTS(ts_queue_t *tsq, const void *item) {
  if (!tsq || !item) {
    queue_error_callback("%s: invalid input\n", __func__);
    return -1;
  }

  pthread_mutex_lock(&tsq->enqueue_mutex);
  while (numOfEmptySlots(tsq->queue) == 0) {
    pthread_cond_wait(&tsq->queue_not_full, &tsq->enqueue_mutex);
  }
  int ret = enqueue(tsq->queue, item);
  pthread_cond_signal(&tsq->queue_not_empty);
  pthread_mutex_unlock(&tsq->enqueue_mutex);
  return ret;
}

int dequeueTS(ts_queue_t *tsq, void *item) {
  if (!tsq || !item) {
    queue_error_callback("%s: invalid input\n", __func__);
    return -1;
  }

  pthread_mutex_lock(&tsq->dequeue_mutex);
  while (numOfElements(tsq->queue) == 0) {
    pthread_cond_wait(&tsq->queue_not_empty, &tsq->dequeue_mutex);
  }
  int ret = dequeue(tsq->queue, item);
  pthread_cond_signal(&tsq->queue_not_full);
  pthread_mutex_unlock(&tsq->dequeue_mutex);
  return ret;
}

一个简单的解决方案是使 numOfElements 原子化。然而,我不想这样做,因为

  1. 我想使用纯锁机制。
  2. 我想将纯队列模块与线程相关的功能分开,以便遵循开闭原则。

我的问题:是否有其他方法可以使 enqueueTS()dequeueTS() 在没有竞争条件的情况下同时运行,同时避免修改queue.c?

c multithreading mutex race-condition circular-queue
1个回答
0
投票

我对 enqueueTS()dequeueTS() 使用两个互斥体,以便写入器和读取器线程可以同时运行,而无需互相等待。然而,enqueueTS()dequeueTS()都会影响structqueue_t中的成员numOfElements,最终导致竞争条件。

没有“最终”。 您的代码确实存在涉及

numOfElements
成员的数据竞争。

是否有 [...] 方法可以使 enqueueTS()dequeueTS() 在没有竞争条件的情况下同时运行,同时避免修改queue.c?

没有。

数据竞争是由于两个线程访问同一非原子对象而引起的,其中至少有一个访问是写入(“冲突访问”),而没有适当的同步措施。您开始使用的

enqueue()
dequeue()
函数对
numOfElements
成员执行冲突的访问。 如果您无法修改它们,那么您唯一的选择就是除了它们自身之外,还使它们彼此同步。 通过相同的互斥锁来访问它们似乎是最自然的方式。 不过,您可以继续使用不同的 CV,这在面对多个入队和/或多个出队时可能是一个优势。

另请注意,仅使

numOfElements
原子化并不是解决方案。 尽管您可以通过这种方式消除数据竞争,但这仍然会给您带来广义的竞争条件,您需要通过对
enqueue
dequeue
函数进行适当修改来解决该问题。

最后,我发现你的线程安全版本有一个底层队列没有的设计漏洞:如果入队和出队不平衡,它会死锁至少一个线程。

© www.soinside.com 2019 - 2024. All rights reserved.