Rabbit MQ 自动为每条消息添加标头属性

问题描述 投票:0回答:1

我正在使用 Java 21 Sprig Boot、RabbitMQ 3.9.28,并且我有许多向 Rabbit 发送消息的组件。我需要向消息标头添加新属性,该属性将是由于消息处理程序中的异常而拒绝消息时的时间戳。我需要它,因为我在不同的错误队列中有数百条消息,并且我不知道异常何时发生 - 我在消息头或正文中看不到任何时间戳,因此我无法导航到正确的日志文件并将消息与日志关联起来。是否有任何拦截器抽象,我可以使用一些示例如何在特定消息处理失败时添加时间戳?

编辑:我没有发现消息处理程序逻辑中的任何错误。另外,我在队列范围内没有任何花哨的错误处理逻辑,我不希望这样..我需要重新实现大量代码和配置,因为有很多处理程序和消息类型..

 @Bean
    public Queue myQueue() {
        Map<String, Object> parameters = new HashMap<>();
        parameters.put(X_DEAD_LETTER_EXCHANGE, "");
        parameters.put(X_DEAD_LETTER_ROUTING_KEY, myName + ERRORS_POSTFIX);
        return new Queue(myName, true, false, false, parameters);
    }

    
    @Bean
    public Queue myErrorQueue(Queue myQueue) {
        Map<String, Object> parameters = new HashMap<>();
        return new Queue(myQueue.getName() + ERRORS_POSTFIX, true, false, false, parameters);
    }
java spring rabbitmq spring-rabbit
1个回答
0
投票

对。由于您使用标准 RabbitMQ DLX 算法,当消息被应用程序拒绝(nack'ed)时,该算法实际上发生在代理上,因此相同的消息被发送到该 DLX,而无法对其进行修改。

您可能会考虑查看

x-death
标头的一个属性,当 DLX 过期后返回到原始队列时,该属性将出现在消息中。

请参阅相应文档中的更多信息:https://www.rabbitmq.com/docs/dlx

另一种方法是手动重新发布消息,您可以在消息中添加额外信息,因为它将是新消息:https://docs.spring.io/spring-amqp/reference/amqp/resilience-recovering -from-errors-and-broker-failures.html#async-listeners.

参见

RepublishMessageRecoverer.additionalHeaders()
重写方法:

/**
 * Subclasses can override this method to add more headers to the republished message.
 * @param message The failed message.
 * @param cause The cause.
 * @return A {@link Map} of additional headers to add.
 */
protected Map<? extends String, ?> additionalHeaders(Message message, Throwable cause) {
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.