带有 X-Death 标头的消息的事务回滚

问题描述 投票:0回答:1

我有一个启用了事务管理的 Spring AMQP 配置,它遇到了在我的侦听器中没有发生必要的回滚的场景。

我的交易所配置如下:

  • status
    -
    DirectExchange
    status.dlx
  • 的死信
  • status.dlx
    -
    DirectExchange
    具有生存时间和死信给
    status
  • status.plx
    -
    DirectExchange
    用于将消息路由到停车场

此外,我还配置了

ConditionalRejectingErrorHandler
和异常处理程序,以便丢弃包含 x-death 标头的消息。发生这种情况时,我使用
handleDiscarded(Message failed)
挂钩将消息发送到我们的停车场交换机 (
status.plx
)。

本例中发生异常时的消息传递模式是:

status
-> 监听器遇到异常 ->
status.dlx
->
status
-> 监听器遇到 x-death 异常 ->
handleDiscarded
->
status.plx

由于 Spring AMQP 异常处理文档中的这段代码,此流程成为首选:

处理 DLQ 消息的常见模式是设置这些消息的生存时间以及其他 DLQ 配置,以便这些消息过期并路由回主队列进行重试。这种技术的问题是导致致命异常的消息会永远循环。从版本 2.1 开始,ConditionalRejectingErrorHandler 检测消息上的 x-death 标头,该标头会导致引发致命异常。该消息被记录并丢弃。您可以通过将 ConditionalRejectingErrorHandler 上的discardFatalsWithXDeath 属性设置为 false 来恢复到之前的行为。

这一切都按预期工作,但是,我发现当消息以这种方式被丢弃时,我的侦听器不会回滚事务。

这似乎是由于

ConditionalRejectingErrorHandler.handleError
实现造成的,当存在 x-death 标头时,它会抛出
ImmediateAcknowledgeAmqpException

/**
 * Special exception for listener implementations that want to signal that the current
 * batch of messages should be acknowledged immediately (i.e. as soon as possible) without
 * rollback, and without consuming any more messages within the current transaction.
 *
 * @author Dave Syer
 * @author Gary Russell
 *
 */
@SuppressWarnings("serial")
public class ImmediateAcknowledgeAmqpException extends AmqpException {
    /* omitted */
}

因此,当我的侦听器第一次失败并路由到死信时,侦听器中的数据库处理将回滚。然后,第二次我的侦听器因 x-death 失败并丢弃消息时,尽管遇到异常/最终路由到停车场,我的侦听器将不会回滚事务。

理想情况下,我希望我的侦听器在遇到异常时始终回滚。

我想知道是否可以设置任何类型的标志来完成此操作,或者是否应该更改我的配置。理想情况下,我想利用

ConditionalRejectingErrorHandler.handleDiscarded
,以便通过配置处理所有路由,而不是设置侦听器或手动检查 x-death。

有什么方法可以利用

ConditionalRejectingErrorHandler
并确保在出现 x-death 标头时发生回滚?

如果我无法做到这一点,我正在考虑立即向停车场发送死信,而不是让重试最终为我完成此操作,但我不确定这是否是推荐的方法。

我目前配置了

spring-boot 2.5.2
/
spring-amqp 2.3.9
,我的事务管理器是
JpaTransactionManager

感谢您的宝贵时间。

spring-amqp spring-rabbit
1个回答
0
投票

根据

ImmediateAcknowledgeAmqpException
逻辑,消费者执行必须正常完成,因此我们可以捕获该异常并调用
this.channel.basicAck(deliveryTag, true);
从原始队列中删除这样的消息。

我想知道您是否可以尝试在

handleDiscarded()
中使用您的用例来执行此操作:

TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();

这样您的 JPA 事务将被标记为回滚。

但不确定 AMQP 客户端对此有何反应......

© www.soinside.com 2019 - 2024. All rights reserved.