AMQP RabbitMQ 消费者被杀死且未重新启动

问题描述 投票:0回答:1

我正在尝试解决 Springboot RabbitMQ AMQP 场景中的一个棘手问题。

  1. 我有多个队列,并发度设置为 2:
  2. 应用程序正常启动并消费消息。我运行了该应用程序的 3 个实例,因此通常有 6 个消费者运行消费消息。
  3. 过了一段时间(可能是几天),我可以在日志中看到抛出了诸如
    Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0)
    之类的异常。
  4. 这将关闭消费者线程,AMQP 不会重新创建它,并且该消费者消失了。这种情况会发生,直到我没有更多的消费者留下,然后队列继续接收消息,但没有消费者可以使用它们。

相关资料:

  1. 这是一个长时间处理场景,这意味着我正在使用 RabbitMQ 来编排 ETL 工作负载,因此某些消息需要很长时间。我在这里的其他答案中看到,这种情况可能会发生,因为处理消息花费的时间太长,然后超时,因此我更改了消费者以执行手动 ACK:
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${rabbitmq.queue.name}", durable = "true"),
                exchange = @Exchange(value = "${rabbitmq.exchange.name}"),
                key = "${rabbitmq.queue.name}"
        ),
        concurrency = "${rabbitmq.queue.concurrency}",
        ackMode = "MANUAL")
public class ExampleConsumer {

    @RabbitHandler
    @CircuitBreaker(name = "CONSUMER_CB", fallbackMethod = "fallback")
    @SneakyThrows
    public void extract(SomeMessage message,
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        channel.basicAck(tag, Boolean.FALSE);
        handleMessage(message);
    }

}

你可以认为

handleMessage(message)
方法需要很长时间来处理。

  1. 我看过 Garry Russel 的一个answer,其中提到“到目前为止,此类问题的最常见原因是容器线程“卡”在用户代码中的某处 - 无论是在侦听器中,还是在由听众;例如僵局。

第一步是在下次碰巧查看侦听器容器线程正在做什么时进行线程转储。”

但是,由于这仅发生在生产代码中,并且很难知道它何时会发生,因此我无法轻松获得线程转储。

另外,考虑到我正在执行手动 ACK,我认为这个错误永远不会发生是错误的吗?因为从表面上看,只有当消费者花了太长时间来处理消息并且没有确认它时,才会发生这种情况,但这里的情况并非如此。

所以,这些是我的问题:

  • 为什么手动 ACK 不能解决这个问题?
  • 我该怎么做才能让 AMQP 自动重新创建消费者?
  • 如果我不能自动做到这一点,我如何创建一个可以检查队列状态并重新创建消费者的看门狗,以防队列没有任何消费者?
  • 无论如何我可以理解是什么错误导致了这个吗?也许我可以实现一个监听器来获取内部异常的详细信息?
java spring-boot rabbitmq amqp
1个回答
0
投票

您可以在这里使用以下方法,

列表项目:

按照以下步骤创建一个 API,

第1步:创建获取端点

第 2 步: 在 Enum 或 Env 中定义所有队列及其所需的消费者数量

第 3 步: 调用 RabbitMQ api

curl -i -u guest:guest http://localhost:15672/api/queues
获取所有队列详细信息

第 4 步: 循环遍历每个队列并检查消费者是否计数

第 5 步: 如果消费者数量较少,则创建缺失数量的消费者线程

第6步:返回响应

在检查消费者数量时,请确保您也在检查虚拟主机。因为同一队列具有不同的虚拟主机也是可能的。如果您只有单个虚拟主机,请忽略。

设置CronJob:

在 GCP 或 AWS 或您想要的任何地方创建一个 cronjob,并每隔两分钟调用此 API。如果你不熟悉 cronjob 那么你可以上网查一下。

© www.soinside.com 2019 - 2024. All rights reserved.