Spring AMQP 和刷新范围 bean

问题描述 投票:0回答:1

我正在使用 spring-boot-starter-parent 3.2.3 中的 spring-amqp。

我有以下配置:

  @Bean
  @RefreshScope
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setHost(host);
    connectionFactory.setPort(port);
    return connectionFactory;
  }

ListenerContainerFactory 设置如下:

public SimpleRabbitListenerContainerFactory
      rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
                                 MessageConverter messageConverter,
                                 LoggingMessagePostProcessor loggingMessagePostProcessor,
                                 RetryOperationsInterceptor messageRetryInterceptor,
                                 SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                 PlatformTransactionManager platformTransactionManager) {

    var factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setMessageConverter(messageConverter);
    factory.setDefaultRequeueRejected(false);
    factory.setPrefetchCount(getPrefetchCount());
    factory.setConcurrentConsumers(getConcurrentConsumers());
    factory.setMaxConcurrentConsumers(getMaxConcurrentConsumers());
    factory.setAdviceChain(messageRetryInterceptor);
    factory.setAfterReceivePostProcessors(loggingMessagePostProcessor);
    factory.setChannelTransacted(true);
    factory.setTransactionManager(platformTransactionManager);
    return factory;
  }

(注意已设置交易通道和交易管理器)

我的工作流程如下:

  1. 通过带有

    @RabbitListener

    注释的方法接收异步消息
  2. 通过

    @Transactional
    方法使用 spring jpa 持久化到数据库

  3. 使用AmqpTemplate生成同步消息

我遇到的问题是,当我们通过调用执行器端点定期刷新上面定义的

ConnectionFactory
bean 的 AMQP 凭证时,它会在日志中导致错误,如下所示:

{"timestamp":"2024-07-28T02:47:11.683026026Z","loggerClass":"org.springframework.transaction.support.TransactionTemplate","thread":"rabbit-simple-4","level":"ERROR","stackTrace":"com.rabbitmq.client.ShutdownSignalException: clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)\n\tat com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:1007)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1127)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1056)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1040)\n\tat   org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)\n\tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741)\n\tat org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)\n\tat java.base/java.lang.VirtualThread.run(Unknown Source)\n","CORRELATION_ID":"dae77ebc-7636-4763-b703-975f3f9174e6","throwable_class":"ShutdownSignalException","serviceName":"My-Service","message":["Application exception overridden by rollback exception"],"serverName":"My-Service-775d5db996-c8m9s"}

应用程序功能没有出现任何问题。我相信这只是摧毁旧的联系并建立新的联系。问题是它是否可以优雅地完成而不导致日志中记录错误级别?

有趣的是,如果我删除行

factory.setTransactionManager(platformTransactionManager);
,日志中的错误消息就会消失。

java spring spring-boot rabbitmq spring-amqp
1个回答
0
投票

在刷新该豆子之前,您必须考虑停止您的

@RabbitListener
容器。 (但不确定如何做到这一点)。每次
receive
调用都会启动事务,并且它确实与连接相关联。因此,如果您的连接工厂在中间刷新,那么它确实可能在事务提交之前被破坏。

如果您可以与我们分享一个可以在我们这边重现的简单项目,那就太好了。如果我们在 Spring AMQP 中针对此用例缺少某些内容,我不会介意。

© www.soinside.com 2019 - 2024. All rights reserved.