amqp 相关问题

AMQP是一种开放式消息传递标准。它为高性能企业消息传递定义了线级协议和语义框架。如果您正在使用特定代理(例如:[rabbitmq]),则AMQP标记可能是多余的。 AMQP不应与Apache [ActiveMQ]混淆


PHP和RABBITMQ 504 GATEWAWAY超时错误在消费者中

我的班级消费者,兔子的等待消息: <?php namespace App\Infrastructure; use App\Application\Interfaces\BankServiceInterface; use App\Application\Interfaces\ConsumerInterface; use App\Application\Interfaces\MailAgentInterface; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; class Consumer implements ConsumerInterface { private AMQPStreamConnection $connection; private AMQPChannel $channel; private BankServiceInterface $service; private MailAgentInterface $mailAgent; public function __construct(AMQPStreamConnection $connection, BankServiceInterface $service, MailAgentInterface $mailAgent) { $this->connection = $connection; $this->channel = $connection->channel(); $this->service = $service; $this->mailAgent = $mailAgent; } public function runFromQueue(string $queueName): void { $this->channel->queue_declare( $queueName, false, true, false, false ); $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume( $queueName, '', false, false, false, false, $this->onConsume() ); while(count($this->channel->callbacks)) { $this->channel->wait(); } $this->closeConnection(); } public function closeConnection(): void { $this->channel->close(); $this->connection->close(); } private function onConsume() { return function ($request) { echo '<pre>'; var_dump($request->getBody()); echo '</pre>'; }; } } 初始化这堂课时,他的作品在504 Gateway Tim-Out中失败。但是,如果从方法删除调用方法getBody():: onConsume 这是工作,并在属性中打印带有消息主体的消息对象。 另外,我会收到504个网关超时错误,如果尝试访问此对象中的访问属性private function onConsume() { return function ($request) { echo '<pre>'; var_dump($request); echo '</pre>'; }; : body 它像方法private function onConsume() { return function ($request) { echo '<pre>'; var_dump($request->body); echo '</pre>'; }; 中的循环一样,循环和服务器按超时范围掉落。尝试用方法runFromQueue()替换loop之后:runFromQueue()但结果将像循环一样,如果调用方法consume()或属性public function runFromQueue(string $queueName): void { $this->channel->queue_declare( $queueName, false, true, false, false ); $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume( $queueName, '', false, false, false, false, $this->onConsume() ); try { $this->channel->consume(); } catch (\Throwable $exception) { echo $exception->getMessage(); } $this->closeConnection(); } ,我会收到504 Gateway超时错误,并且此功能与简单打印getBody()。 然后我进行简单的修改,添加类新属性: body 将其放在循环条件下: $request 值此属性更改为private $isMessageRead = true; : public function runFromQueue(string $queueName): void { $this->channel->queue_declare( $queueName, false, true, false, false ); $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume( $queueName, '', false, false, false, false, $this->onConsume() ); while($this->isMessageRead) { $this->channel->wait(); } $this->closeConnection(); } 这项工作,但我对此解决方案不满意。有什么问题?如果没有财产onConsume()?? 这个问题似乎源于消费者如何处理消息确认和连接的生命周期。当您调用private function onConsume() { return function ($request) { echo '<pre>'; var_dump($request->getBody()); echo '</pre>'; $this->isMessageRead = false; }; } 时,它可能正在阻止或无法正确确认消息,从而导致服务器超时。 正在发生的事情: 方法用于处理传入消息。但是,如果您不正确确认消息(即使用isMessageRead),则服务器可以保持连接打开或处于无响应状态,从而导致超时。 您的解决方案是$request->getBody()的,因为您正在手动控制流动,但这不是理想的。 要正确修复此此操作而无需使用诸如wait()的标志,请确保您在处理后确认消息并将控件返回到队列。 任期代码: $request->ack() 这种方式,在处理消息后,您将确认它,它告诉RabbitMQ已消耗消息,并且连接可以继续进行下一条消息而无需无限期地等待。这应该防止504网关超时问题。 通过确保正确的确认,脚本应平稳运行,而无需自定义标志或循环。

回答 1 投票 0

单元测试 Symfony Messenger

我一直在我最新的项目中使用 Symfony Messenger 来实现 AMQP。虽然代码的工作让我非常高兴,但我无法为发送消息的方法编写单元测试......

回答 2 投票 0

如何强制 RabbitMQ 代理 NACK 消息进行测试?

我正在通过 Spring AMQP 使用 RabbitMQ 开发 Spring Boot 项目。我们希望能够在消息发布被 NACK 时测试应用程序的行为(特别是当

回答 3 投票 0

Spring RabbitMQ - RPC - CorrelationId 不匹配 - TopicExchange - 客户端 - 服务器 - 模型

我有 Spring boot - RabbitMQ 应用程序。使用的交换器是topicexchange。 公共静态最终字符串RPC_REQ_QUEUE =“req.queue”; public static Final String RPC_RES_QUEUE = "res.queue&q...

回答 1 投票 0

Azure 服务总线和 Azure 容器应用程序 - 重新部署失败,因为已经有一个客户端针对该主题使用 tha sub

我有一个(WildFly 33 上的 Java 21/EE10)容器,它使用 JMS 2.0(因此是 ESB 高级层)连接到 Azure 服务总线。 一切都很好。 但是,当重新部署(甚至重新启动)POD 时,@St...

回答 1 投票 0

消费不确认来自 RabbitMq 的消息

我创建了一个简单的发布者和一个使用 basic.consume 在队列上订阅的消费者。 当作业运行无异常时,我的消费者会确认这些消息。每当我跑步时...

回答 2 投票 0

AMQP 0-9-1 vs 1-0

我正在为一个新项目寻找消息服务,该项目必须将一些 C# 应用程序与一些 Java 应用程序连接起来。我真的很喜欢 RabbitMQ,因为它似乎有惊人的支持......

回答 2 投票 0

适用于.NET的AMQP V1.0客户端

我正在考虑使用 RabbitMQ 或 ActiveMQ 等产品。我发现这些产品在一定程度上对 AMQP v1.0 有一定程度的支持。 然而,我正在努力寻找一个cl...

回答 3 投票 0

在 Pika 中获取队列大小(AMQP Python)

简单的问题,但Google或Pika开源代码没有帮助。有没有办法在 Pika 中查询当前队列大小(项目计数器)?

回答 7 投票 0

如何在 Python SDK 中使 Azure 服务总线客户端和发送方实例保持活动状态超过 10 分钟?

我目前正在致力于Azure服务总线队列和自定义网关服务的集成。每次服务收到事件时,都应该将消息发送到专用队列。 我已经成功确认...

回答 1 投票 0

rabbitMQ 中的 Ack 或 Nack

我正在使用rabbitMQ,我使用basic_get从队列中获取每条消息,而没有自动确认过程,这意味着消息保留在队列中,直到我确认或确认消息。 有时我也有

回答 3 投票 0

如何使用 AMQP 将消息发布到 Solace 代理中的主题

我们有一个使用smallrye-amqp 连接器的Quarkus 应用程序。我有以下用于将消息发布到主题的代码。 @ApplicationScoped 公共类 InsightLifecycleObserver { 公共站...

回答 1 投票 0

_ITERATOR_DEBUG_LEVEL 项目中一个文件不匹配

我正在尝试使用 amqpcpp 库在 Visual Studio Community 2022 (v143) 中编译我的 C++ 项目,但没有成功。 我已经使用其 github 页面上的说明构建了 amqpcpp 库文件,并且

回答 1 投票 0

RabbitMQ 用户权限格式

我正在尝试使用rabbitmqctl在RabbitMQ中配置用户权限。 RabbitMQ 文档 http://www.rabbitmq.com/man/rabbitmqctl.1.man.html 给出了设置配置的基本示例,w...

回答 2 投票 0

Spring-AMQP:问题,如果 @RabbitListener 不返回 void?

注释为 @RabbitListener 的方法不返回 void 是一个问题还是不好的做法? 为了以同步方式重用,我更改了方法,返回结果而不是 void: @

回答 1 投票 0

Spring-boot RabbitMQ 轮换凭证

背景信息 我正在使用 spring-boot-starter-amqp 依赖项连接到 RabbitMQ。 在我的 application.yaml 中我有: 春天: 兔子MQ: 主持人: 端口:5672 用户名: <

回答 1 投票 0

RabbitMQ / amqplib -- 错误:帧大小超过帧最大值

与 RabbitMQ 的连接失败并出现错误:帧大小超出帧最大值。 虽然StackOverflow和Github上也提出了一些类似的问题,但仍然很模糊。 人们假设 v...

回答 3 投票 0

从 Apache Qpid java 客户端使用 RabbitMQ 服务器消息?哪个版本?

我必须使用带有Apache qpid java客户端的rabbitmq-server(因为我必须使用Apache Camel)。但我对它们两个的版本感到困惑,特别是因为 amqp 协议支持......

回答 1 投票 0

使用 AMQP 注释的 ActiveMQ Artemis 重新交付延迟

在artemis中,当使用AMQP时,有一个注释表明您希望稍后发送消息,称为x-opt-delivery-time https://activemq.apache.org/components/artemis/

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.