我有一个启用了事务管理的 Spring AMQP 配置,它遇到了在我的侦听器中没有发生必要的回滚的场景。
我的交易所配置如下:
status
- DirectExchange
与 status.dlx
status.dlx
- DirectExchange
具有生存时间和死信给 status
status.plx
- DirectExchange
用于将消息路由到停车场此外,我还配置了
ConditionalRejectingErrorHandler
和异常处理程序,以便丢弃包含 x-death 标头的消息。发生这种情况时,我使用 handleDiscarded(Message failed)
挂钩将消息发送到我们的停车场交换机 (status.plx
)。
本例中发生异常时的消息传递模式是:
status
-> 监听器遇到异常 -> status.dlx
-> status
-> 监听器遇到 x-death 异常 -> handleDiscarded
-> status.plx
由于 Spring AMQP 异常处理文档中的这段代码,此流程成为首选:
处理 DLQ 消息的常见模式是设置这些消息的生存时间以及其他 DLQ 配置,以便这些消息过期并路由回主队列进行重试。这种技术的问题是导致致命异常的消息会永远循环。从版本 2.1 开始,ConditionalRejectingErrorHandler 检测消息上的 x-death 标头,该标头会导致引发致命异常。该消息被记录并丢弃。您可以通过将 ConditionalRejectingErrorHandler 上的discardFatalsWithXDeath 属性设置为 false 来恢复到之前的行为。
这一切都按预期工作,但是,我发现当消息以这种方式被丢弃时,我的侦听器不会回滚事务。
这似乎是由于
ConditionalRejectingErrorHandler.handleError
实现造成的,当存在 x-death 标头时,它会抛出 ImmediateAcknowledgeAmqpException
:
/**
* Special exception for listener implementations that want to signal that the current
* batch of messages should be acknowledged immediately (i.e. as soon as possible) without
* rollback, and without consuming any more messages within the current transaction.
*
* @author Dave Syer
* @author Gary Russell
*
*/
@SuppressWarnings("serial")
public class ImmediateAcknowledgeAmqpException extends AmqpException {
/* omitted */
}
因此,当我的侦听器第一次失败并路由到死信时,侦听器中的数据库处理将回滚。然后,第二次我的侦听器因 x-death 失败并丢弃消息时,尽管遇到异常/最终路由到停车场,我的侦听器将不会回滚事务。
理想情况下,我希望我的侦听器在遇到异常时始终回滚。
我想知道是否可以设置任何类型的标志来完成此操作,或者是否应该更改我的配置。理想情况下,我想利用
ConditionalRejectingErrorHandler.handleDiscarded
,以便通过配置处理所有路由,而不是设置侦听器或手动检查 x-death。
有什么方法可以利用
ConditionalRejectingErrorHandler
并确保在出现 x-death 标头时发生回滚?
如果我无法做到这一点,我正在考虑立即向停车场发送死信,而不是让重试最终为我完成此操作,但我不确定这是否是推荐的方法。
我目前配置了
spring-boot 2.5.2
/ spring-amqp 2.3.9
,我的事务管理器是 JpaTransactionManager
感谢您的宝贵时间。
根据
ImmediateAcknowledgeAmqpException
逻辑,消费者执行必须正常完成,因此我们可以捕获该异常并调用this.channel.basicAck(deliveryTag, true);
从原始队列中删除这样的消息。
我想知道您是否可以尝试在
handleDiscarded()
中使用您的用例来执行此操作:
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
这样您的 JPA 事务将被标记为回滚。
但不确定 AMQP 客户端对此有何反应......