我正在使用 spring-boot-starter-parent 3.2.3 中的 spring-amqp。
我有以下配置:
@Bean
@RefreshScope
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setPort(port);
return connectionFactory;
}
ListenerContainerFactory 设置如下:
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
MessageConverter messageConverter,
LoggingMessagePostProcessor loggingMessagePostProcessor,
RetryOperationsInterceptor messageRetryInterceptor,
SimpleRabbitListenerContainerFactoryConfigurer configurer,
PlatformTransactionManager platformTransactionManager) {
var factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(messageConverter);
factory.setDefaultRequeueRejected(false);
factory.setPrefetchCount(getPrefetchCount());
factory.setConcurrentConsumers(getConcurrentConsumers());
factory.setMaxConcurrentConsumers(getMaxConcurrentConsumers());
factory.setAdviceChain(messageRetryInterceptor);
factory.setAfterReceivePostProcessors(loggingMessagePostProcessor);
factory.setChannelTransacted(true);
factory.setTransactionManager(platformTransactionManager);
return factory;
}
(注意已设置交易通道和交易管理器)
我的工作流程如下:
通过带有
@RabbitListener
注释的方法接收异步消息
通过
@Transactional
方法使用 spring jpa 持久化到数据库
使用AmqpTemplate生成同步消息
我遇到的问题是,当我们通过调用执行器端点定期刷新上面定义的
ConnectionFactory
bean 的 AMQP 凭证时,它会在日志中导致错误,如下所示:
{"timestamp":"2024-07-28T02:47:11.683026026Z","loggerClass":"org.springframework.transaction.support.TransactionTemplate","thread":"rabbit-simple-4","level":"ERROR","stackTrace":"com.rabbitmq.client.ShutdownSignalException: clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)\n\tat com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:1007)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1127)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1056)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1040)\n\tat org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)\n\tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741)\n\tat org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)\n\tat java.base/java.lang.VirtualThread.run(Unknown Source)\n","CORRELATION_ID":"dae77ebc-7636-4763-b703-975f3f9174e6","throwable_class":"ShutdownSignalException","serviceName":"My-Service","message":["Application exception overridden by rollback exception"],"serverName":"My-Service-775d5db996-c8m9s"}
应用程序功能没有出现任何问题。我相信这只是摧毁旧的联系并建立新的联系。问题是它是否可以优雅地完成而不导致日志中记录错误级别?
有趣的是,如果我删除行
factory.setTransactionManager(platformTransactionManager);
,日志中的错误消息就会消失。
在刷新该豆子之前,您必须考虑停止您的
@RabbitListener
容器。 (但不确定如何做到这一点)。每次 receive
调用都会启动事务,并且它确实与连接相关联。因此,如果您的连接工厂在中间刷新,那么它确实可能在事务提交之前被破坏。
如果您可以与我们分享一个可以在我们这边重现的简单项目,那就太好了。如果我们在 Spring AMQP 中针对此用例缺少某些内容,我不会介意。