我开始学习在 C 中使用 Posix 消息队列。(我的主机是 Kubuntu 22.04)
我正在尝试编写一个程序,该程序启动指定数量的线程,这些线程正在等待队列中的消息(消息=包含指向已分配内存的指针的结构)。如果有消息,最快的线程会获取消息并以某种方式处理它。
主程序只是启动线程,填充消息队列,然后等待每个线程完成,显示结果。但我总是收到“消息太长”错误。
根据我的研究,我发现这通常是可能的。但由于我好几天都没有得到一个可以工作的程序,我开始对其进行分解,并且由于“消息太长”错误,我得到了以下代码,但该代码仍然无法工作。
据我了解:
发送总是有效,当我刚刚向队列发送一条消息时,文件(/dev/mqueue/test-123)的内容显示了正确的 QSIZE:70(20 + 50)
我在这里遗漏了什么吗?或者我只是错误地认为可以将结构放入消息队列中。
此外:由于通过消息队列的唯一通信发生在父进程和线程之间,因此在我看来,在它们之间共享地址应该是安全的。
简化后的代码如下所示:
#include <pthread.h>
#include <mqueue.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
typedef struct message {
//long int message_type;
char path[50];
char name[20];
} message_t;
int main(int argc, char *argv[])
{
const char* queue_name = "/test-123";
// Message Queue File Descriptor for Sending
mqd_t msgq_fd ;
// Attributes of the Message queue
struct mq_attr attr = { 0 };
attr.mq_maxmsg = 100;
attr.mq_msgsize = sizeof(message_t);
// (Create and) open the message Queue for Writing only
if((msgq_fd = mq_open(queue_name, O_RDWR | O_CREAT, 0660, &attr)) == -1) {
perror("Sender: mq_open failed");
exit(4);
}
//Put 5 Messages into the Message queue
for(int i=0; i < 5; i++)
{
message_t *msg = calloc(1,sizeof(message_t));
strcpy(msg->name,"test");
strcpy(msg->path,"testpath");
printf("Message %d prepared...", i);
// Error Sending
if(mq_send(msgq_fd, (const char *)msg, sizeof(message_t), 0) == -1) {
perror("Sender: unable to send message");
mq_close(msgq_fd);
exit(10);
}
printf("queued\n");
sleep(1);
}
printf("\nReading messages from Queue\n");
int ret = 0;
// Struct for storing the message from the Message Queue
message_t recv_msg;
// Read 5 messages from the Message Queue
for(int i=0; i < 5; i++)
{
printf("Message %d:\n",i+1);
sleep(1);
ret = mq_receive(msgq_fd,(char*)&recv_msg,sizeof(attr.mq_msgsize),NULL);
// Error receiving
if(ret == -1) {
//printf("%d, %p, %ld\n",msgq_fd2, &recv_msg, sizeof(message_t));
perror("Message receive error");
}
// Print the content of the received message
printf("%s/%s\n\n",recv_msg.path,recv_msg.name);
}
return 0;
}
我找到了一个可行的解决方案,但我仍然不确定主要问题是什么。
在我的解决方案中现在我有 2 个结构:
当我在堆上创建 message_content 时,我将指向堆的地址存储在 char 指针中。当从队列中读取消息时,我将 msg_ptr 从 message_t 结构转换为 message_content_t。
这是我的代码:
#include <pthread.h>
#include <mqueue.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define QUEUE_NAME "/test-123"
#define MAX_MESSAGES 10
#define MAX_MSG_SIZE 100
typedef struct message_content {
char path[MAX_MSG_SIZE-20];
char name[20];
} message_content_t;
typedef struct message{
char *msg_ptr;
} message_t;
int main(int argc, char *argv[])
{
// Message Queue File Descriptor for Sending
mqd_t msgq_fd ;
// Attributes of the Message queue
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = MAX_MESSAGES;
attr.mq_msgsize = sizeof(message_t);
attr.mq_curmsgs = 0;
// (Create and) open the message Queue for Writing only
msgq_fd = mq_open(QUEUE_NAME, O_WRONLY | O_CREAT, 0660, &attr);
if(msgq_fd == (mqd_t)-1) {
perror("Sender: mq_open failed");
exit(4);
}
//Put 5 Messages into the Message queue
for(int i=0; i < 5; i++)
{
message_content_t *msg_content = calloc(1,sizeof(message_content_t));
strcpy(msg_content->name,"test");
strcpy(msg_content->path,"testpath");
message_t msg;
msg.msg_ptr = (char *)msg_content;
printf("Message %d prepared...", i);
// Error Sending
if(mq_send(msgq_fd, (const char *)&msg, sizeof(message_t), 0) == -1) {
perror("Sender: unable to send message");
mq_close(msgq_fd);
exit(10);
}
printf("queued\n");
sleep(1);
}
mq_close(msgq_fd);
printf("\nReading messages from Queue\n");
// Reopen the message queue for reading
msgq_fd = mq_open(QUEUE_NAME, O_RDONLY);
if (msgq_fd == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
size_t ret = 0;
// Struct for storing the message from the Message Queue
message_t recv_msg;
// Read 5 messages from the Message Queue
for(int i=0; i < 5; i++)
{
printf("(bytes_read: %ld) Message %d:\n",ret,i+1);
sleep(1);
ret = mq_receive(msgq_fd,(char*)&recv_msg,sizeof(message_t),NULL);
// Error receiving
if(ret == -1) {
//printf("%d, %p, %ld\n",msgq_fd2, &recv_msg, sizeof(message_t));
perror("Message receive error");
}
// Print the content of the received message
message_content_t *ptr = (message_content_t *)recv_msg.msg_ptr;
printf("%s/%s\n\n",ptr->path,ptr->name);
}
return 0;
}