使用互斥体和信号量实现类似 MPSC 的队列。
最终,消费者将尝试使空队列出队。
我已经编辑了问题以包含一个最小的可重现示例。创建了 5 个生产者,每个生产者会将其 id 入队 4 次。队列大小为 4。
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdlib.h>
typedef struct {
int capacity;
int* data;
int length;
int offset;
} Queue;
typedef struct {
int id;
Queue* queue;
pthread_mutex_t* queue_lock;
sem_t* queue_sem;
} ProducerArgs;
int dequeue(Queue*);
void enqueue(Queue*, int);
void init_queue(Queue*, int);
bool is_queue_empty(Queue*);
bool is_queue_full(Queue*);
void* producer(void*);
int main(int argc, char* argv[]) {
pthread_mutex_t queue_lock;
sem_t queue_sem;
sem_init(&queue_sem, 0, 0);
Queue queue;
init_queue(&queue, 4);
ProducerArgs args[5];
pthread_t producers[5];
for (int i = 0; i < 5; i++) {
args[i].id = i;
args[i].queue = &queue;
args[i].queue_lock = &queue_lock;
args[i].queue_sem = &queue_sem;
pthread_create(&producers[i], NULL, producer, (void*)&args[i]);
}
for (int j = 0; j < 20; j++) {
sem_wait(&queue_sem);
pthread_mutex_lock(&queue_lock);
int value = dequeue(&queue);
pthread_mutex_unlock(&queue_lock);
(void)value;
}
return 0;
}
int dequeue(Queue* queue) {
assert(!is_queue_empty(queue));
int value = queue->data[queue->offset];
queue->offset = (queue->offset + 1) % queue->capacity;
queue->length--;
return value;
}
void enqueue(Queue* queue, int value) {
assert(!is_queue_full(queue));
queue->data[(queue->offset + queue->length) % queue->capacity] = value;
queue->length++;
}
bool is_queue_empty(Queue* queue) {
return queue->length == 0;
}
bool is_queue_full(Queue* queue) {
return queue->length == queue->capacity;
}
void init_queue(Queue* queue, int capacity) {
queue->capacity = capacity;
queue->data = malloc(sizeof(int) * capacity);
queue->length = 0;
queue->offset = 0;
}
void* producer(void* ptr) {
ProducerArgs* args = (ProducerArgs*)ptr;
int completed = 0;
while (completed < 4) {
pthread_mutex_lock(args->queue_lock);
if (is_queue_full(args->queue)) {
pthread_mutex_unlock(args->queue_lock);
} else {
enqueue(args->queue, args->id);
pthread_mutex_unlock(args->queue_lock);
sem_post(args->queue_sem);
completed++;
}
}
return NULL;
}
信号量被初始化为 0。据我所知,这应该给我我想要的语义。消费者应该在进入其出队步骤之前等待 sem_posts,并且每个 sem_post 应该指示另一个值已添加到队列中。我的假设有错吗?
我使用的是 Mac OSX,不支持 sem_init(它返回 -1)。我尝试在 Linux 机器上运行相同的代码,效果很好。
这个故事的寓意:我是个傻瓜,应该检查我的返回码。