我在管理 RabbitMQ 和数据库事务时遇到问题以防找不到交换。这是简单的顺序:
当未找到 Exchange 时,不会发送消息,但会在数据库中更新该行,这不尊重事务行为。 针对其他错误情况(数据库错误或 RabbitMQ 不可用)正确管理事务。
如何将此用例作为事务处理进行管理?
配置中启用交易:
@Bean
@ConditionalOnMissingClass("org.springframework.orm.jpa.JpaTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jacksonMessageConverter());
template.setChannelTransacted(true);
return template;
}
我的服务:
@Override
@Transactional
public void push(Message message) {
rabbitTemplate.convertAndSend(
"MessageExchange",
"binding.key",
objectMapper.writeValueAsString(message));
repository.markAsSent(message.getId());
}
离开方法后引发错误,而不是在
rabbitTemplate.convertAndSend
方法中:
[AMQP Connection] ERROR o.s.a.r.c.CachingConnectionFactory.log : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'MessageExchange' in vhost '/', class-id=60, method-id=40)
[ThreadPoolTaskScheduler1] ERROR o.s.t.s.TransactionSynchronizationUtils.invokeAfterCompletion : TransactionSynchronization.afterCompletion threw exception
java.lang.IllegalStateException: Channel closed during transaction
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1171)
at com.sun.proxy.$Proxy143.txCommit(Unknown Source)
at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:153)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceSynchronization.afterCompletion(ConnectionFactoryUtils.java:332)
尝试使用
Jarkata Transactional
注解代替 Spring 框架默认注解:
@Override
@jakarta.transaction.Transactional
public void push(Message message) {
rabbitTemplate.convertAndSend(
"MessageExchange",
"binding.key",
objectMapper.writeValueAsString(message));
repository.markAsSent(message.getId());
}
这应该可以解决您的问题。