message-queue 相关问题

“消息队列”是设计模式或软件工程组件,其定义用于两个或更多相互关联的过程或系统之间的通信的规则或API。消息队列强制执行异步处理和松散耦合。根据实施情况,它们还可以提供消息的传递和订单保证。消息处理保证明确排除在设计之外。

如何监听侧线程中的事件?

我正在制作使用 binance websocket api 获取信息的应用程序。我们已经有了主要代码库,但决定在独立应用程序中实现 websockets。 它看起来像这样: 导入异步 来自

回答 1 投票 0

消息队列使用什么协议与客户端通信?

我曾经使用过消息队列,但从未想过,它们使用什么协议进行通信?基本上它们都支持阻塞方法,例如 // 使线程休眠,直到有可用消息为止 ...

回答 1 投票 0

借助线程创建消息队列

我正在学习创建线程以及它们如何相互交互,所以我想创建一个简单的消息队列。 我遵循的模型是这样的: 现在我有点陷入一个问题,我...

回答 1 投票 0

win32 ShellExecute 从队列中删除消息

调用 ShellExecute 似乎会从消息队列中删除消息。下面提供了用于测试此行为的 C 程序的源代码。请注意,我正在使用 ShellExecute 打开 ...

回答 1 投票 0

AWS SQS 与其链接的 Lambda 之间的映射问题

背景 我有一个带有此签名的函数: 公共字符串函数(MyCommand cmd,ILambdaContext 上下文) 我的命令在哪里: 公共类我的命令 { 公共字符串网址{获取;放; } =

回答 1 投票 0

push/pop 真的不能作为云服务使用吗?

我们正在寻找一种消息队列系统,确保根据消息的行号顺序处理消息。 这似乎是计算的基础部分 - 但我们似乎无法......

回答 2 投票 0

消息队列:Kafka 与 RabbitMQ?

我是消息队列和流媒体的新手。我对RabbitMQ和Kafka的区别和应用有疑问。 有人可以解释一下技术差异以及每个差异通常如何......

回答 1 投票 0

getPartitionedTopicMetadata 时无法获取连接 - 连接握手失败

我按照本教程设置了apache pulsar:https://pulsar.apache.org/docs/3.0.x/deploy-aws/,但我在教程中使用aws ansible动态清单而不是terraform清单。该项目...

回答 1 投票 0

RPA 机器人与 RabbitMQ 消息代理

请仔细阅读描述,有任何疑问可以向我提问。 我正在研究 RPA 机器人。在我的项目中,我有两个机器人。 该项目使用两个机器人自主运行:Bot1 和 Bo...

回答 0 投票 0

PHP Rabbitmq 如何检查消费者是否正在运行?

根据 Rabbitmq 文档,我编写了一个用于消费队列的脚本。但它是非常基本的例子。该脚本将作为 cron 运行,因为我需要确保它会恢复消耗过程,如果 previ ...

回答 0 投票 0

RabbitMQ - 发射后忘记

我想在 C# 中实现一个发布者,它将消息写入 RabbitMQ 队列。消息的编写不应影响其余过程的性能,并且应该...

回答 0 投票 0

消息系统的应用集成场景?

我有一个场景,其中有一个后端系统和一个前端系统,它们之间有一个基于 .NET 的中间件。 后端系统提供前端系统使用的 REST API。

回答 1 投票 0

带有分布式集线器服务器的实时 SignalR 架构

请帮助我确定以下是否是创建分布式 signalR 通信而不会丢失消息的正确架构。我假设了以下事情 Azure 函数 + 队列:对于

回答 0 投票 0

.NET - 消息代理共享数据模型

我有一个关于通过消息代理发送消息的问题。我正在开发一个使用干净架构的 .NET 应用程序,我有一个 SharedContracts 项目,其中有消息包装器 ...

回答 0 投票 0

推送通知/邮件应用程序的消息队列?

我需要一个消息队列,我可以在其中为每个用户订购消息,并且会有多个消费者,每个消费者可以接收任何未被另一个消费者处理的消息。消费者可能会...

回答 1 投票 0

亲子沟通问题

父母有“m”条消息要发送给“n”个孩子。在每次迭代中,它向所有子节点发送一条消息。如果所有回复“已收到”,它会发送下一条消息。下面是我的代码 #包括 父母有“m”条消息要发送给“n”个孩子。在每次迭代中,它向所有子节点发送一条消息。如果所有回复“已收到”,它会发送下一条消息。下面是我的代码 #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/wait.h> #include <stdio.h> #include <string.h> #include <signal.h> #include <stdlib.h> #include <unistd.h> #define MAX_LEN 50 struct details { int no; int id; char msg_text[MAX_LEN]; }; typedef struct msg_buf { long int msg_type; struct details d; } msg; char message[20][50] = { " ", "Message1", "Message2", "Message3", "Message4", "Message5", "Message6", "Message7", "Message8" }; int main(int argc, char *argv[]) { int id1, id2, n, m, i, j, status, p_id; pid_t pid; msg snd, rcvd; if (argc < 2) { perror("Too few arguments"); return -1; } n = atoi(argv[1]); m = atoi(argv[2]); id1 = msgget((key_t)78, IPC_CREAT | 0666); id2 = msgget((key_t)56, IPC_CREAT | 0666); if (id1 == -1 || id2 == -1) { printf("Error in creating message queue\n"); return -1; } p_id = getpid(); printf("\nQueues are created.\n\n"); for (i = 1; i <= n; i++) { pid = fork(); if (pid == 0) break; } if (pid == 0) { for (j = 1; j <= m; j++) { printf("Process %d waiting for message %d\n", getpid(), j); while (msgrcv(id1, &rcvd, sizeof(struct details), j + 1, IPC_NOWAIT) < 0) { } srand(time(0)); strcpy(snd.d.msg_text, "Received"); snd.msg_type = rcvd.msg_type; snd.d.no = rcvd.d.no; snd.d.id = getpid(); if (msgsnd(id2, &snd, sizeof(struct details), IPC_NOWAIT) < 0) { perror("msgsnd"); } else { printf("Reply sending successful for message %d for process %d\n", snd.d.no, snd.d.id); } } } if (p_id == getpid()) { for (j = 1; j <= m; j++) { strcpy(snd.d.msg_text, message[j]); snd.msg_type = j + 1; snd.d.no = j; if (msgsnd(id1, &snd, sizeof(struct details), 0) < 0) { perror("msgsnd"); } else { printf("Message %d sent- \t\t\t\t%s\n", j, snd.d.msg_text); } for (i = 1; i <= n; i++) { sleep(i); if (msgrcv(id2, &rcvd, sizeof(struct details), j + 1, 0) >= 0) { printf("Reply received from child %d for message %d - \t%s\n", rcvd.d.id, j, rcvd.d.msg_text); } } } wait(NULL); printf("Communication End.\n\n"); } } 我得到的输出是n=2,m=2是: Queues are created. Message 1 sent- Message1 Process 14804 waiting for message 1 Process 14805 waiting for message 1 Reply sending successful for message 1 for process 14804 Process 14804 waiting for message 2 Reply received from child 14804 for message 1 - Received 此后什么也没有打印。没有任何进展。问题是什么?如何解决这个问题? 最大的单一问题是在评论中提到的一个: AFAICS,您每次迭代发送一条消息,因此最多一个孩子可以阅读它。您不会向每个孩子发送一条消息。 任何给定的消息都可以被(最多)一个进程接收。对于 n 进程接收消息,您必须发送 n 消息。 其他问题主要是装饰性的。我使用了 GitHub 上我的 SOQ(堆栈溢出问题)存储库中可用的日志记录代码作为文件 stderr.c 和 stderr.h 在 src/libsoq 子目录中。一些消息被写入 stdout — 没有只接受文件流的函数,所以我使用 err_logmsg() 接受文件流、控制选项和退出状态(但函数不应该退出,所以这是未使用的)以及格式和参数。 结果是: /* SO 7592-2454 */ #include <errno.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/wait.h> #include <unistd.h> #include "stderr.h" #define MAX_LEN 50 struct details { int no; int id; char msg_text[MAX_LEN]; }; typedef struct msg_buf { long int msg_type; struct details d; } msg; static char message[20][50] = { " ", "Message1", "Message2", "Message3", "Message4", "Message5", "Message6", "Message7", "Message8" }; static const char usestr[] = "num_kids num_msgs"; static void dump_message(const char *tag, const msg *info) { err_logmsg(stdout, err_getlogopts(), 0, "%s: type %ld (details: no %d, id %d, text [%s])\n", tag, info->msg_type, info->d.no, info->d.id, info->d.msg_text); } int main(int argc, char *argv[]) { err_setarg0(argv[0]); if (argc != 3) err_usage(usestr); int n = atoi(argv[1]); int m = atoi(argv[2]); int id1 = msgget((key_t)78, IPC_CREAT | 0666); int id2 = msgget((key_t)56, IPC_CREAT | 0666); if (id1 == -1 || id2 == -1) err_syserr("failed to create a message queue: "); pid_t p_id = getpid(); err_setlogopts(ERR_PID|ERR_MILLI); err_logmsg(stdout, err_getlogopts(), 0, "Queues are created.\n"); pid_t pid; for (int i = 1; i <= n; i++) { pid = fork(); if (pid == 0) break; } msg snd; msg rcvd; if (pid == 0) { for (int j = 1; j <= m; j++) { err_logmsg(stdout, err_getlogopts(), 0, "Waiting for message %d\n", j); while (msgrcv(id1, &rcvd, sizeof(struct details), j + 1, IPC_NOWAIT) < 0) err_syserr("msgrcv() returned with error status: "); dump_message("Message received", &rcvd); strcpy(snd.d.msg_text, "Received"); snd.msg_type = rcvd.msg_type; snd.d.no = rcvd.d.no; snd.d.id = getpid(); if (msgsnd(id2, &snd, sizeof(struct details), IPC_NOWAIT) < 0) err_sysrem("msgsnd() failed: "); else dump_message("Message sent", &snd); } err_logmsg(stdout, err_getlogopts(), 0, "Child process complete\n"); exit(EXIT_SUCCESS); } if (p_id == getpid()) { for (int j = 1; j <= m; j++) { strcpy(snd.d.msg_text, message[j]); snd.msg_type = j + 1; snd.d.no = j; for (int i = 0; i < n; i++) { if (msgsnd(id1, &snd, sizeof(struct details), 0) < 0) err_sysrem("msgsnd() failed: "); else dump_message("Message sent", &snd); } for (int i = 1; i <= n; i++) { err_logmsg(stdout, err_getlogopts(), 0, "Dozing for %d seconds\n", i); sleep(i); if (msgrcv(id2, &rcvd, sizeof(struct details), j + 1, 0) >= 0) dump_message("Reply received", &rcvd); else err_syserr("msgrcv() failed: "); } } int corpse; int status; while ((corpse = wait(&status)) > 0) err_remark("Child %d exited with status 0x%.4X\n", corpse, status); err_logmsg(stdout, err_getlogopts(), 0, "Communication End.\n\n"); } return 0; } 有大量的日志记录,因为我需要看看发生了什么。代码应该验证 n 和 m 的值(它们应该至少是 1 并且不超过一些微弱的理智数字(例如 10,但会有一个或两个值(枚举或定义常数)来限制可接受的范围。 这是一个示例输出(源代码msg79.c,程序msg79): $ ./msg79 2 3 msg79: 2023-04-03 21:36:38.215 - pid=67247: Queues are created. msg79: 2023-04-03 21:36:38.217 - pid=67247: Message sent: type 2 (details: no 1, id 1, text [Message1]) msg79: 2023-04-03 21:36:38.217 - pid=67247: Message sent: type 2 (details: no 1, id 1, text [Message1]) msg79: 2023-04-03 21:36:38.217 - pid=67247: Dozing for 1 seconds msg79: 2023-04-03 21:36:38.217 - pid=67248: Waiting for message 1 msg79: 2023-04-03 21:36:38.217 - pid=67249: Waiting for message 1 msg79: 2023-04-03 21:36:38.217 - pid=67248: Message received: type 2 (details: no 1, id 1, text [Message1]) msg79: 2023-04-03 21:36:38.217 - pid=67249: Message received: type 2 (details: no 1, id 1, text [Message1]) msg79: 2023-04-03 21:36:38.217 - pid=67248: Message sent: type 2 (details: no 1, id 67248, text [Received]) msg79: 2023-04-03 21:36:38.217 - pid=67248: Waiting for message 2 msg79: 2023-04-03 21:36:38.217 - pid=67249: Message sent: type 2 (details: no 1, id 67249, text [Received]) msg79: 2023-04-03 21:36:38.218 - pid=67248: Message received: type 3 (details: no 2, id 1, text [Message2]) msg79: 2023-04-03 21:36:38.218 - pid=67249: Waiting for message 2 msg79: 2023-04-03 21:36:38.218 - pid=67249: Message received: type 3 (details: no 2, id 1, text [Message2]) msg79: 2023-04-03 21:36:38.218 - pid=67248: Message sent: type 3 (details: no 2, id 67248, text [Received]) msg79: 2023-04-03 21:36:38.218 - pid=67249: Message sent: type 3 (details: no 2, id 67249, text [Received]) msg79: 2023-04-03 21:36:38.218 - pid=67248: Waiting for message 3 msg79: 2023-04-03 21:36:38.219 - pid=67249: Waiting for message 3 msg79: 2023-04-03 21:36:38.219 - pid=67248: Message received: type 4 (details: no 3, id 1, text [Message3]) msg79: 2023-04-03 21:36:38.219 - pid=67249: Message received: type 4 (details: no 3, id 1, text [Message3]) msg79: 2023-04-03 21:36:38.219 - pid=67248: Message sent: type 4 (details: no 3, id 67248, text [Received]) msg79: 2023-04-03 21:36:38.219 - pid=67249: Message sent: type 4 (details: no 3, id 67249, text [Received]) msg79: 2023-04-03 21:36:38.219 - pid=67248: Child process complete msg79: 2023-04-03 21:36:38.219 - pid=67249: Child process complete msg79: 2023-04-03 21:36:39.219 - pid=67247: Reply received: type 2 (details: no 1, id 67248, text [Received]) msg79: 2023-04-03 21:36:39.219 - pid=67247: Dozing for 2 seconds msg79: 2023-04-03 21:36:41.220 - pid=67247: Reply received: type 2 (details: no 1, id 67249, text [Received]) msg79: 2023-04-03 21:36:41.220 - pid=67247: Message sent: type 3 (details: no 2, id 1, text [Message2]) msg79: 2023-04-03 21:36:41.220 - pid=67247: Message sent: type 3 (details: no 2, id 1, text [Message2]) msg79: 2023-04-03 21:36:41.220 - pid=67247: Dozing for 1 seconds msg79: 2023-04-03 21:36:42.221 - pid=67247: Reply received: type 3 (details: no 2, id 67248, text [Received]) msg79: 2023-04-03 21:36:42.221 - pid=67247: Dozing for 2 seconds msg79: 2023-04-03 21:36:44.223 - pid=67247: Reply received: type 3 (details: no 2, id 67249, text [Received]) msg79: 2023-04-03 21:36:44.223 - pid=67247: Message sent: type 4 (details: no 3, id 1, text [Message3]) msg79: 2023-04-03 21:36:44.223 - pid=67247: Message sent: type 4 (details: no 3, id 1, text [Message3]) msg79: 2023-04-03 21:36:44.223 - pid=67247: Dozing for 1 seconds msg79: 2023-04-03 21:36:45.227 - pid=67247: Reply received: type 4 (details: no 3, id 67248, text [Received]) msg79: 2023-04-03 21:36:45.227 - pid=67247: Dozing for 2 seconds msg79: 2023-04-03 21:36:47.227 - pid=67247: Reply received: type 4 (details: no 3, id 67249, text [Received]) msg79: 2023-04-03 21:36:47.227 - pid=67247: Child 67249 exited with status 0x0000 msg79: 2023-04-03 21:36:47.227 - pid=67247: Child 67248 exited with status 0x0000 msg79: 2023-04-03 21:36:47.228 - pid=67247: Communication End. $

回答 1 投票 0

.NET 6 中的两个任务之间交换数据

我的应用程序中有两个独立运行的任务。其中一个是从设备获取一些数据,另一个应该是 收集该数据。 他们俩都是“永远的循环&qu...

回答 1 投票 0

验证 LavinMQ 版本

我最近一直在探索 AMQP 消息代理 LavinMQ。 我目前正在尝试弄清楚我的服务器上运行的代理版本。 是否有用于此的 CLI 命令?也许一些...

回答 1 投票 0

如何使用消息队列在 C 中的进程之间获得随机暂停?

我正在编写一个使用进程间消息队列的 C 程序。我正在尝试重现进程 J(法官)和 n-child 进程之间的拍卖。程序从输入文件 txt 和 nu...

回答 0 投票 0

如何执行广泛的业务逻辑并从失败的地方重试

我正在尝试解决业务逻辑中发生大量事务的问题。 例如: 请求到达服务器(服务A) 第 1 步 - 数据库中的事务 第 2 步 - 呼叫服务 B

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.