我正在使用 Java 21 Sprig Boot、RabbitMQ 3.9.28,并且我有许多向 Rabbit 发送消息的组件。我需要向消息标头添加新属性,该属性将是由于消息处理程序中的异常而拒绝消息时的时间戳。我需要它,因为我在不同的错误队列中有数百条消息,并且我不知道异常何时发生 - 我在消息头或正文中看不到任何时间戳,因此我无法导航到正确的日志文件并将消息与日志关联起来。是否有任何拦截器抽象,我可以使用一些示例如何在特定消息处理失败时添加时间戳?
编辑:我没有发现消息处理程序逻辑中的任何错误。另外,我在队列范围内没有任何花哨的错误处理逻辑,我不希望这样..我需要重新实现大量代码和配置,因为有很多处理程序和消息类型..
@Bean
public Queue myQueue() {
Map<String, Object> parameters = new HashMap<>();
parameters.put(X_DEAD_LETTER_EXCHANGE, "");
parameters.put(X_DEAD_LETTER_ROUTING_KEY, myName + ERRORS_POSTFIX);
return new Queue(myName, true, false, false, parameters);
}
@Bean
public Queue myErrorQueue(Queue myQueue) {
Map<String, Object> parameters = new HashMap<>();
return new Queue(myQueue.getName() + ERRORS_POSTFIX, true, false, false, parameters);
}
对。由于您使用标准 RabbitMQ DLX 算法,当消息被应用程序拒绝(nack'ed)时,该算法实际上发生在代理上,因此相同的消息被发送到该 DLX,而无法对其进行修改。
您可能会考虑查看
x-death
标头的一个属性,当 DLX 过期后返回到原始队列时,该属性将出现在消息中。
请参阅相应文档中的更多信息:https://www.rabbitmq.com/docs/dlx
另一种方法是手动重新发布消息,您可以在消息中添加额外信息,因为它将是新消息:https://docs.spring.io/spring-amqp/reference/amqp/resilience-recovering -from-errors-and-broker-failures.html#async-listeners.
参见
RepublishMessageRecoverer.additionalHeaders()
重写方法:
/**
* Subclasses can override this method to add more headers to the republished message.
* @param message The failed message.
* @param cause The cause.
* @return A {@link Map} of additional headers to add.
*/
protected Map<? extends String, ?> additionalHeaders(Message message, Throwable cause) {