我正在尝试解决 Springboot RabbitMQ AMQP 场景中的一个棘手问题。
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0)
之类的异常。相关资料:
@Component
@Slf4j
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${rabbitmq.queue.name}", durable = "true"),
exchange = @Exchange(value = "${rabbitmq.exchange.name}"),
key = "${rabbitmq.queue.name}"
),
concurrency = "${rabbitmq.queue.concurrency}",
ackMode = "MANUAL")
public class ExampleConsumer {
@RabbitHandler
@CircuitBreaker(name = "CONSUMER_CB", fallbackMethod = "fallback")
@SneakyThrows
public void extract(SomeMessage message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
channel.basicAck(tag, Boolean.FALSE);
handleMessage(message);
}
}
你可以认为
handleMessage(message)
方法需要很长时间来处理。
第一步是在下次碰巧查看侦听器容器线程正在做什么时进行线程转储。”
但是,由于这仅发生在生产代码中,并且很难知道它何时会发生,因此我无法轻松获得线程转储。
另外,考虑到我正在执行手动 ACK,我认为这个错误永远不会发生是错误的吗?因为从表面上看,只有当消费者花了太长时间来处理消息并且没有确认它时,才会发生这种情况,但这里的情况并非如此。
所以,这些是我的问题:
您可以在这里使用以下方法,
列表项目:
按照以下步骤创建一个 API,
第1步:创建获取端点
第 2 步: 在 Enum 或 Env 中定义所有队列及其所需的消费者数量
第 3 步: 调用 RabbitMQ api
curl -i -u guest:guest http://localhost:15672/api/queues
获取所有队列详细信息
第 4 步: 循环遍历每个队列并检查消费者是否计数
第 5 步: 如果消费者数量较少,则创建缺失数量的消费者线程
第6步:返回响应
在检查消费者数量时,请确保您也在检查虚拟主机。因为同一队列具有不同的虚拟主机也是可能的。如果您只有单个虚拟主机,请忽略。
设置CronJob:
在 GCP 或 AWS 或您想要的任何地方创建一个 cronjob,并每隔两分钟调用此 API。如果你不熟悉 cronjob 那么你可以上网查一下。