类似 MPSC 的队列因错误出队而失败

问题描述 投票:0回答:1

目标

使用互斥体和信号量实现类似 MPSC 的队列。

问题

最终,消费者将尝试使空队列出队。

最小可重现示例

我已经编辑了问题以包含一个最小的可重现示例。创建了 5 个生产者,每个生产者会将其 id 入队 4 次。队列大小为 4。

#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdlib.h>

typedef struct {
        int capacity;
        int* data;
        int length;
        int offset;
} Queue;

typedef struct {
        int id;
        Queue* queue;
        pthread_mutex_t* queue_lock;
        sem_t* queue_sem;
} ProducerArgs;

int dequeue(Queue*);
void enqueue(Queue*, int);
void init_queue(Queue*, int);
bool is_queue_empty(Queue*);
bool is_queue_full(Queue*);
void* producer(void*);

int main(int argc, char* argv[]) {
        pthread_mutex_t queue_lock;

        sem_t queue_sem;
        sem_init(&queue_sem, 0, 0);

        Queue queue;
        init_queue(&queue, 4);

        ProducerArgs args[5];
        pthread_t producers[5];
        for (int i = 0; i < 5; i++) {
                args[i].id = i;
                args[i].queue = &queue;
                args[i].queue_lock = &queue_lock;
                args[i].queue_sem = &queue_sem;
                pthread_create(&producers[i], NULL, producer, (void*)&args[i]);
        }

        for (int j = 0; j < 20; j++) {
                sem_wait(&queue_sem);
                pthread_mutex_lock(&queue_lock);
                int value = dequeue(&queue);
                pthread_mutex_unlock(&queue_lock);
                (void)value;
        }

        return 0;
}

int dequeue(Queue* queue) {
        assert(!is_queue_empty(queue));
        int value = queue->data[queue->offset];
        queue->offset = (queue->offset + 1) % queue->capacity;
        queue->length--;
        return value;
}

void enqueue(Queue* queue, int value) {
        assert(!is_queue_full(queue));
        queue->data[(queue->offset + queue->length) % queue->capacity] = value;
        queue->length++;
}

bool is_queue_empty(Queue* queue) {
        return queue->length == 0;
}

bool is_queue_full(Queue* queue) {
        return queue->length == queue->capacity;
}

void init_queue(Queue* queue, int capacity) {
        queue->capacity = capacity;
        queue->data = malloc(sizeof(int) * capacity);
        queue->length = 0;
        queue->offset = 0;
}

void* producer(void* ptr) {
        ProducerArgs* args = (ProducerArgs*)ptr;

        int completed = 0;
        while (completed < 4) {
                pthread_mutex_lock(args->queue_lock);
                if (is_queue_full(args->queue)) {
                        pthread_mutex_unlock(args->queue_lock);
                } else {
                        enqueue(args->queue, args->id);
                        pthread_mutex_unlock(args->queue_lock);
                        sem_post(args->queue_sem);
                        completed++;
                }
        }
        return NULL;
}

注释

信号量被初始化为 0。据我所知,这应该给我我想要的语义。消费者应该在进入其出队步骤之前等待 sem_posts,并且每个 sem_post 应该指示另一个值已添加到队列中。我的假设有错吗?

c mutex semaphore concurrent-queue
1个回答
0
投票

我使用的是 Mac OSX,不支持 sem_init(它返回 -1)。我尝试在 Linux 机器上运行相同的代码,效果很好。

这个故事的寓意:我是个傻瓜,应该检查我的返回码。

© www.soinside.com 2019 - 2024. All rights reserved.