在DefaultMessageListenerContainer中配置sessionAcknowledgeMode

问题描述 投票:0回答:3

我有一个设置,我必须从 ActiveMQ 代理中的队列中读取消息。读取消息后,我必须对该消息执行长时间运行的操作。

由于对消息的长时间运行操作,我想尽快确认该消息,以便释放代理上的资源。计划是在收到消息后执行以下步骤:

  1. 从ActiveMQ获取消息
  2. 将消息插入数据库
  3. 确认消息
  4. 对消息进行一些长时间运行的操作

我已经阅读了有关 JMS 和不同确认模式的信息,因此在尝试这样做之前,我决定建立一个应用程序,在其中我可以尝试不同的模式以了解它们的处理方式,不幸的是,我似乎无法获得所需的输出.

遵循此答案中的信息https://stackoverflow.com/a/10188078以及https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/ listener/DefaultMessageListenerContainer.html 我认为通过使用 AUTO_ACKNOWLEDGE 消息将在调用侦听器之前得到确认,但是如果我在侦听器中抛出异常,消息将被重新传递。

我尝试过将 setSessionTransacted 设置为 true 和不设置 setSessionTransacted ,但在这两种情况下我都得到相同的输出。当 JmsListener 中抛出异常时,消息将被重新传递。

JMS的配置

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory(jmsConfig.getBrokerUrl());

        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmstemplate(){
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(connectionFactory());
        //jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setDefaultDestinationName( jmsConfig.getQueueIn() );
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerFactoryxxxx(
                    ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        //factory.setConcurrency("1");
        factory.setSessionTransacted(true);

        configurer.configure(factory, connectionFactory);
        return factory;
    }

JmsListener

@JmsListener(destination = "B1Q1", containerFactory = "jmsListenerContainerFactoryxxxx")
public  void receiveMessage(Message message) {
    try {
        TextMessage m = (TextMessage) message;
        String messageText = m.getText();

        int retryNum = message.getIntProperty("JMSXDeliveryCount");
        long s = message.getLongProperty("JMSTimestamp");

        Date d = new Date( s );

        String dbText = String.format("Retry %d. Message: %s", retryNum, messageText);

        if ( messageText.toLowerCase().contains("exception") ) {
            logger.info("Creating exception for retry: {}", retryNum);
            throw new RuntimeException();
        }
    } catch (JMSException e) {
        logger.error("Exception!!", e);
    }
}

我应该如何更改代码,以便在抛出异常时不重新发送消息?

回到我的应用程序,我将在其中将消息插入数据库。在消息插入数据库之后、执行长时间运行的任务之前,如何通过 JmsListener 确认消息?

spring-boot activemq-classic spring-jms
3个回答
4
投票

为了能够使用

AUTO_ACKNOWLEDGE
CLIENT_ACKNOWLEDGE
,我必须在配置工厂后调用
factory.setSessionTransacted(false)

调用

configurer.configure(factory, connectionFactory)
会覆盖
sessionTransacted
的值,在我的例子中,它将其设置为
true
,这使得
AUTO_ACKNOWLEDGE
CLIENT_ACKNOWLEDGE
无效。这是
DefaultJmsListenerContainerFactoryConfigurer.java
的相关代码:

public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
...
...
        if (this.transactionManager != null) {
            factory.setTransactionManager(this.transactionManager);
        } else {
            factory.setSessionTransacted(true);
        }
...
...

0
投票
          factory.setSessionAcknowledgeMode(Tibjms.EXPLICIT_CLIENT_ACKNOWLEDGE);
          //factory.setSessionTransacted(false);// here it’s  not working
          factory.setTaskExecutor(new SimpleAsyncTaskExecutor("KDBMessageListener-"));
          configurer.configure(factory, connectionFactory);
          factory.setSessionTransacted(false); //post configure ,session transacted is working

0
投票

我尝试过这个配置。

@Bean
    JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setSessionTransacted(false);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

没成功。

© www.soinside.com 2019 - 2024. All rights reserved.