RMQ Consumer spring boot 服务中无法重新创建 RabbitMQ 错误

问题描述 投票:0回答:1

我工作的地方有一个 springboot 服务。简而言之,这个服务所做的就是触发rabbitMQ Consumers 开始消费来自它订阅的不同队列的消息。我们可以切换这些消费者开始或停止使用控制器。假设我们有 5 个服务被支持服务(让我称之为:SS)监听,我们有 5 个控制器配置为切换这些消费者的启动/停止。

注意:我们的发布者是我们无法控制的上游系统。

每个服务交换器绑定的队列的抽象如下:

  • _队列
  • _DEADLETTER
  • _PARKING_LOT

这些配置的方式是,当消息到达交换器时,它将其映射到默认队列。如果存在任何处理问题,消息将被路由到死信队列并在那里重试 3 次。第4次重试(1次正常流+3次重试流)后,消息被路由到parkingLot队列进行存储。然后这些消息将被 SS 消费,并将这些记录存储在数据库中以供持久化和以后分析。

这里的问题是我们收到 x-death 错误,该错误反过来会丢弃消息并且不允许 SS 将有效负载存储到数据库中。以下是错误日志:

这些日志以 4 组为一组,如我添加的评论中所示。

正如日志所示,尝试使用带有 x-death 标头的消息时似乎存在问题,失败原因是致命消息转换错误。但事实似乎并非如此。我尝试在我的开发环境中重新创建此错误,但此错误是如此不一致,以至于很少被重现。下面提到了一些值得注意的点:

  • 对于正确的有效负载,似乎什么也没有发生。
  • 即使有效负载错误,消息也会进入死信(附加 x-death 标头)。有时,SS 会毫无问题地使用它并将有效负载存储在数据库中。但在极少数情况下,它会抛出此 x-death 标头错误。
  • 每当抛出此 x-death 错误时,记录都不会被消耗,也不会存储在数据库中。此外,有效负载不会打印在日志中(对于 Prod 中的 RCA 很重要,因为这个问题已在 prod 中发现)。

我尝试将错误有效负载的多个排列和组合直接发送到生产者交易所,希望重现这些问题。 404、500、不正确的 api url、错误的有效负载格式等,但它们都已被消耗,并且数据与相关错误日志一起存储在数据库中。

一个值得注意的发现是,当我执行这些特定步骤集时,我只能重新创建一次此错误。

  1. 启动Service并让它连接到所有相关队列。
  2. 等待了一段时间 - 给服务一些空闲时间。
  3. 将消息直接发布到交换中(使用不正确的 API URL,在点击它时会抛出 500 内部服务器错误)。发送了 31 条相同的消息。
  4. 又等了一些时间,让消息进入死信,然后到达停车场(大约 3 分钟 - DL 中的 TTL 为 1 分钟)。
  5. 消息到达停车场队列后,我又等了5分钟。
  6. 使用启动参数点击 SS API 来切换消费者启动。

此时,31 条消息中的 27 条抛出了 x-death 错误,并且其中只有 4 条存在相关错误并保留在数据库中。但是当我在当天晚些时候尝试这些相同的步骤时(在我重新创建它后大约 2 小时),我能够使用所有消息;即使我遵循了上面提到的相同步骤。

是否还有其他原因导致此错误发生?任何帮助将不胜感激。

注意:开发应用程序 (SS) 部署在 AKS (Azure) 上,API 也是 AKS URL。

PS:如果您需要任何其他信息,请随时索取。

java spring-boot rabbitmq azure-aks spring-rabbit
1个回答
0
投票

StackOverflow 不允许我粘贴相关日志,将其标记为垃圾邮件。以下是原始问题中提到的日志:

{"@timestamp":"2024-08-13T04:25:55.666340167Z","@version": "1","message": "Execution of Rabbit message listener failed.","logger_name": "org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler","thread_name": "order-2","level": "WARN","level_value": 30000,"stack_trace": "org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message\n\tat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:158)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1663)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1582)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1570)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1561)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1506)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1484)\n\tat io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493)\n\tat io.micrometer.observation.Observation.observeWithContext(Observation.java:603)\n\tat io.micrometer.observation.Observation.observe(Observation.java:492)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1484)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:994)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:941)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1323)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.springframework.amqp.support.converter.MessageConversionException: failed to resolve class name. Class not found [com.xyz.sourcing.purchaseorder.domain.Event]\n\tat org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:189)\n\tat org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper.fromTypeHeader(DefaultJackson2JavaTypeMapper.java:146)\n\tat org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:122)\n\tat org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:394)\n\tat org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:364)\n\tat org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:321)\n\tat org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:304)\n\tat org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)\n\tat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:385)\n\tat org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)\n\tat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:258)\n\tat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)\n\t... 15 common frames omitted\nCaused by: java.lang.ClassNotFoundException: com.xyz.sourcing.purchaseorder.domain.Event\n\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)\n\tat org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:149)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)\n\tat java.base/java.lang.Class.forName0(Native Method)\n\tat java.base/java.lang.Class.forName(Class.java:467)\n\tat org.springframework.util.ClassUtils.forName(ClassUtils.java:291)\n\tat org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:185)\n\t... 26 common frames omitted\n"}
{"@timestamp":"2024-08-13T04:25:55.666573996Z","@version":"1","message":"Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@4bec94c9(byte[430])' MessageProperties [headers={tracestate=1798415@nr=0-0-1542405-710958860-4d829bd199bb0fa9-fa07f6961c58cb8f-1-1.004289-1723491004180, x-first-death-exchange=gis-order-dl-exchange, traceparent=00-0a0f1f42ed84ec0490c4f08bae1962f8-4d829bd199bb0fa9-01, x-death=[{reason=expired, count=3, exchange=gis-order-dl-exchange, routing-keys=[GIS.ORDER.DEADLETTER], time=Mon Aug 12 19:28:02 GMT 2024, queue=gis_booking_order_deadletter_queue}], x-first-death-reason=expired, x-first-death-queue=gis_booking_order_deadletter_queue, custom_header=[CustomHeader(count=1, errorMessage=404 Not Found from GET https://dev.api.azeus.xyz.com/sourcing/order-repository-service/order-receipt-linkage?orderNumber=XYZ123, eventId=81833449-0b32-4d24-92e1-b7c5f510f3a3, timestamp=Mon Aug 12 19:27:02 GMT 2024), CustomHeader(count=2, errorMessage=404 Not Found from GET https://dev.api.azeus.xyz.com/sourcing/order-repository-service/order-receipt-linkage?orderNumber=XYZ123, eventId=81833449-0b32-4d24-92e1-b7c5f510f3a3, timestamp=Mon Aug 12 19:28:03 GMT 2024), CustomHeader(count=3, errorMessage=404 Not Found from GET https://dev.api.azeus.xyz.com/sourcing/order-repository-service/order-receipt-linkage?orderNumber=XYZ123, eventId=81833449-0b32-4d24-92e1-b7c5f510f3a3, timestamp=Mon Aug 12 19:29:03 GMT 2024)], __TypeId__=com.xyz.sourcing.purchaseorder.domain.Event, newrelic=eyJkIjp7ImFjIjoiMTU0MjQwNSIsInByIjoxLjAwNDI4OSwidHgiOiJmYTA3ZjY5NjFjNThjYjhmIiwidGkiOjE3MjM0OTEwMDQxODAsInR5IjoiQXBwIiwidGsiOiIxNzk4NDE1IiwiaWQiOiI0ZDgyOWJkMTk5YmIwZmE5IiwidHIiOiIwYTBmMWY0MmVkODRlYzA0OTBjNGYwOGJhZTE5NjJmOCIsInNhIjp0cnVlLCJhcCI6IjcxMDk1ODg2MCJ9LCJ2IjpbMCwxXX0=}, timestamp=Mon Aug 12 19:27:00 GMT 2024, contentType=application/json, contentEncoding=UTF-8, contentLength=0, priority=0, redelivered=false, receivedExchange=gis-order-dl-exchange, receivedRoutingKey=GIS.ORDER.PARK, deliveryTag=3, consumerTag=amq.ctag-snqN38JmZjVIkqyeAQ7TkA, consumerQueue=gis_booking_order_parkinglot_queue])","logger_name":"org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy","thread_name":"order-2","level":"WARN","level_value":30000}
{"@timestamp":"2024-08-13T04:25:55.66666626Z","@version":"1","message":"x-death header detected on a message with a fatal exception; perhaps requeued from a DLQ? - discarding: (Body:'[B@4bec94c9(byte[430])' MessageProperties [headers={tracestate=1798415@nr=0-0-1542405-710958860-4d829bd199bb0fa9-fa07f6961c58cb8f-1-1.004289-1723491004180, x-first-death-exchange=gis-order-dl-exchange, traceparent=00-0a0f1f42ed84ec0490c4f08bae1962f8-4d829bd199bb0fa9-01, x-death=[{reason=expired, count=3, exchange=gis-order-dl-exchange, routing-keys=[GIS.ORDER.DEADLETTER], time=Mon Aug 12 19:28:02 GMT 2024, queue=gis_booking_order_deadletter_queue}], x-first-death-reason=expired, x-first-death-queue=gis_booking_order_deadletter_queue, custom_header=[CustomHeader(count=1, errorMessage=404 Not Found from GET https://dev.api.azeus.xyz.com/sourcing/order-repository-service/order-receipt-linkage?orderNumber=XYZ123, eventId=81833449-0b32-4d24-92e1-b7c5f510f3a3, timestamp=Mon Aug 12 19:27:02 GMT 2024), CustomHeader(count=2, errorMessage=404 Not Found from GET https://dev.api.azeus.xyz.com/sourcing/order-repository-service/order-receipt-linkage?orderNumber=XYZ123, eventId=81833449-0b32-4d24-92e1-b7c5f510f3a3, timestamp=Mon Aug 12 19:28:03 GMT 2024), CustomHeader(count=3, errorMessage=404 Not Found from GET https://dev.api.azeus.xyz.com/sourcing/order-repository-service/order-receipt-linkage?orderNumber=XYZ123, eventId=81833449-0b32-4d24-92e1-b7c5f510f3a3, timestamp=Mon Aug 12 19:29:03 GMT 2024)], __TypeId__=com.xyz.sourcing.purchaseorder.domain.Event, newrelic=eyJkIjp7ImFjIjoiMTU0MjQwNSIsInByIjoxLjAwNDI4OSwidHgiOiJmYTA3ZjY5NjFjNThjYjhmIiwidGkiOjE3MjM0OTEwMDQxODAsInR5IjoiQXBwIiwidGsiOiIxNzk4NDE1IiwiaWQiOiI0ZDgyOWJkMTk5YmIwZmE5IiwidHIiOiIwYTBmMWY0MmVkODRlYzA0OTBjNGYwOGJhZTE5NjJmOCIsInNhIjp0cnVlLCJhcCI6IjcxMDk1ODg2MCJ9LCJ2IjpbMCwxXX0=}, timestamp=Mon Aug 12 19:27:00 GMT 2024, contentType=application/json, contentEncoding=UTF-8, contentLength=0, priority=0, redelivered=false, receivedExchange=gis-order-dl-exchange, receivedRoutingKey=GIS.ORDER.PARK, deliveryTag=3, consumerTag=amq.ctag-snqN38JmZjVIkqyeAQ7TkA, consumerQueue=gis_booking_order_parkinglot_queue])","logger_name":"org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler","thread_name":"order-2","level":"ERROR","level_value":40000}
{"@timestamp":"2024-08-13T04:25:55.666727385Z","@version":"1","message":"Execution of Rabbit message listener failed, and the error handler threw an exception","logger_name":"org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","thread_name":"order-2","level":"ERROR","level_value":40000,"stack_trace":"org.springframework.amqp.ImmediateAcknowledgeAmqpException: Fatal and x-death present\n\tat org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:143)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1453)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1751)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1527)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1484)\n\tat io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493)\n\tat io.micrometer.observation.Observation.observeWithContext(Observation.java:603)\n\tat io.micrometer.observation.Observation.observe(Observation.java:492)\n\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1484)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:994)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:941)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1323)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"}
© www.soinside.com 2019 - 2024. All rights reserved.