Posix 消息队列:消息(结构类型)总是太长

问题描述 投票:0回答:1

我开始学习在 C 中使用 Posix 消息队列。(我的主机是 Kubuntu 22.04)

我正在尝试编写一个程序,该程序启动指定数量的线程,这些线程正在等待队列中的消息(消息=包含指向已分配内存的指针的结构)。如果有消息,最快的线程会获取消息并以某种方式处理它。

主程序只是启动线程,填充消息队列,然后等待每个线程完成,显示结果。但我总是收到“消息太长”错误。

根据我的研究,我发现这通常是可能的。但由于我好几天都没有得到一个可以工作的程序,我开始对其进行分解,并且由于“消息太长”错误,我得到了以下代码,但该代码仍然无法工作。

据我了解:

  1. 对于消息队列,可以设置最大消息大小(我选择了结构的大小:sizeof(message_t))。
  2. 为了发送,我只是使用我自己的结构和我的结构的大小。
  3. 读取消息时,我必须告诉 mq_receive 要读取的消息的大小(我也将其设置为 sizeof(message_t)

发送总是有效,当我刚刚向队列发送一条消息时,文件(/dev/mqueue/test-123)的内容显示了正确的 QSIZE:70(20 + 50)

我在这里遗漏了什么吗?或者我只是错误地认为可以将结构放入消息队列中。

此外:由于通过消息队列的唯一通信发生在父进程和线程之间,因此在我看来,在它们之间共享地址应该是安全的。

简化后的代码如下所示:

#include <pthread.h>
#include <mqueue.h>
#include <stdint.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h>
#include <unistd.h>

typedef struct message {
    //long int message_type; 
    char path[50];
    char name[20];
} message_t;

int main(int argc, char *argv[])
{
    const char* queue_name = "/test-123";

    // Message Queue File Descriptor for Sending
    mqd_t msgq_fd ;

    // Attributes of the Message queue
    struct mq_attr attr = { 0 };
    attr.mq_maxmsg = 100;
    attr.mq_msgsize = sizeof(message_t);

    // (Create and) open the message Queue for Writing only
    if((msgq_fd = mq_open(queue_name, O_RDWR | O_CREAT, 0660, &attr)) == -1) {
        perror("Sender: mq_open failed");
        exit(4);
    }

    //Put 5 Messages into the Message queue
    for(int i=0; i < 5; i++)
    {
       message_t *msg = calloc(1,sizeof(message_t));
       strcpy(msg->name,"test");
       strcpy(msg->path,"testpath");

        printf("Message %d prepared...", i);

        // Error Sending 
        if(mq_send(msgq_fd, (const char *)msg, sizeof(message_t), 0) == -1) {
                perror("Sender: unable to send message");
                mq_close(msgq_fd);
                exit(10);
        }
        printf("queued\n");
        sleep(1);

    }

    printf("\nReading messages from Queue\n");
    int ret = 0;

    // Struct for storing the message from the Message Queue
    message_t recv_msg;

    // Read 5 messages from the Message Queue
    for(int i=0; i < 5; i++)
    {
        printf("Message %d:\n",i+1);
        sleep(1);

        ret = mq_receive(msgq_fd,(char*)&recv_msg,sizeof(attr.mq_msgsize),NULL);

        // Error receiving
        if(ret == -1) {
            //printf("%d, %p, %ld\n",msgq_fd2, &recv_msg, sizeof(message_t));
            perror("Message receive error");
        }

        // Print the content of the received message
        printf("%s/%s\n\n",recv_msg.path,recv_msg.name);
    }
    return 0;
}
c linux posix ipc mqueue
1个回答
0
投票

我找到了一个可行的解决方案,但我仍然不确定主要问题是什么。

在我的解决方案中现在我有 2 个结构:

  1. message_content_t --> 包含路径和名称
  2. message_t --> 包含一个 char 指针

当我在堆上创建 message_content 时,我将指向堆的地址存储在 char 指针中。当从队列中读取消息时,我将 msg_ptr 从 message_t 结构转换为 message_content_t。

这是我的代码:

#include <pthread.h>
#include <mqueue.h>
#include <stdint.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h>
#include <unistd.h>

#define QUEUE_NAME "/test-123"
#define MAX_MESSAGES 10
#define MAX_MSG_SIZE 100

typedef struct message_content {
    char path[MAX_MSG_SIZE-20];
    char name[20];
} message_content_t;

typedef struct message{
    char *msg_ptr;
} message_t;

int main(int argc, char *argv[])
{
    // Message Queue File Descriptor for Sending
    mqd_t msgq_fd ;

    // Attributes of the Message queue
    struct mq_attr attr;
    attr.mq_flags = 0;
    attr.mq_maxmsg = MAX_MESSAGES;
    attr.mq_msgsize = sizeof(message_t);
    attr.mq_curmsgs = 0;

    // (Create and) open the message Queue for Writing only
    msgq_fd = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT, 0660, &attr);
    if(msgq_fd == (mqd_t)-1) {
        perror("Sender: mq_open failed");
        exit(4);
    }

    //Put 5 Messages into the Message queue
    for(int i=0; i < 5; i++)
    {
        message_content_t *msg_content = calloc(1,sizeof(message_content_t));
        strcpy(msg_content->name,"test");
        strcpy(msg_content->path,"testpath");

        message_t msg;
        msg.msg_ptr = (char *)msg_content;

        printf("Message %d prepared...", i);

        // Error Sending 
        if(mq_send(msgq_fd, (const char *)&msg, sizeof(message_t), 0) == -1) {
                perror("Sender: unable to send message");
                mq_close(msgq_fd);
                exit(10);
        }
        printf("queued\n");
        sleep(1);

    }
    mq_close(msgq_fd);

    printf("\nReading messages from Queue\n");
    // Reopen the message queue for reading
    msgq_fd = mq_open(QUEUE_NAME, O_RDONLY);
    if (msgq_fd == (mqd_t)-1) {
        perror("mq_open");
        exit(EXIT_FAILURE);
    }

    size_t ret = 0;

    // Struct for storing the message from the Message Queue
    message_t recv_msg;

    // Read 5 messages from the Message Queue
    for(int i=0; i < 5; i++)
    {
        printf("(bytes_read: %ld) Message %d:\n",ret,i+1);
        sleep(1);

        ret = mq_receive(msgq_fd,(char*)&recv_msg,sizeof(message_t),NULL);

        // Error receiving
        if(ret == -1) {
            //printf("%d, %p, %ld\n",msgq_fd2, &recv_msg, sizeof(message_t));
            perror("Message receive error");
        }

        // Print the content of the received message
        message_content_t *ptr = (message_content_t *)recv_msg.msg_ptr;
        printf("%s/%s\n\n",ptr->path,ptr->name);
    }
    return 0;
}
© www.soinside.com 2019 - 2024. All rights reserved.