Jakarta JMS Spring-boot 立即发送消息

问题描述 投票:0回答:1

我需要发送一些消息而不等待交易结束。 但我尝试过的所有设置都喜欢

 template.setSessionTransacted(false);
并且设置不同的确认模式并没有带来任何结果。

我使用的是 jms 库。 现在它使用所有设置默认值,但我尝试使用所有知识模块启用结束禁用事务模块

    implementation 'org.apache.activemq:artemis-jakarta-client:2.38.0'
    implementation 'org.apache.activemq:artemis-core-client:2.38.0'
    implementation 'org.apache.activemq:artemis-commons:2.38.0'
    implementation 'org.springframework:spring-jms:6.1.14'

我的 JMS 配置

@Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(format("tcp://%s:%s", host, port));
        connectionFactory.setUser(user);
        connectionFactory.setPassword(pass);

        return connectionFactory;
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {

        Logger logger = (Logger) LoggerFactory.getLogger("org.springframework.jms");
        logger.setLevel(Level.ERROR);

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setErrorHandler(new CustomErrorHandler());
        configurer.configure(factory, connectionFactory);

        return factory;
    }

    @Bean
    public JmsTemplate jmsAnycastTemplate(ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }

Worker 客户端代码

    public static void doAllWork(int moduleId, MqProducer mqProducer, MqMessage message){
        mqProducer.sendAcceptToWork(moduleId, message);

        Work.doSomeWork(moduleId, mqProducer);

        mqProducer.sendEndWork(moduleId, message, 200L);
    }

    @SneakyThrows
    public static void doSomeWork(int moduleId, MqProducer mqProducer){

        int secToWork = ThreadLocalRandom.current().nextInt(1, 10);
        LocalDateTime workstarted = LocalDateTime.now();

        while (Duration.between(workstarted, LocalDateTime.now()).getSeconds() < secToWork) {
            Thread.sleep(1000);
            System.out.printf("%s [api] WORK_ENDED message received%n", LocalDateTime.now());
            mqProducer.sendMessage("api", new MqMessage(IN_PROGRESS, moduleId, 200));
        }
    }

并且有来自工作客户端的输出

sendAcceptToWork 3 0
2024-12-19T13:03:45.046627435 Working...
2024-12-19T13:03:46.057809869 Working...
2024-12-19T13:03:47.065647593 Working...
2024-12-19T13:03:48.074799444 Working...
2024-12-19T13:03:49.080706764 Working...
2024-12-19T13:03:50.088089181 Working...
2024-12-19T13:03:51.095799844 Working...
2024-12-19T13:03:52.104492990 Working...
2024-12-19T13:03:53.113162971 Working...
Work done

有代码api客户端

    @SneakyThrows
    @Override
    public void run(MqMessage message) {

        if (message.getMqOperationCode() == MqOperationCode.WORK_STARTED){
            System.out.println();
            System.out.printf("%s [api] WORK_STARTED message received%n", LocalDateTime.now());
            return;
        }
        if (message.getMqOperationCode() == MqOperationCode.IN_PROGRESS){
            System.out.printf("%s [api] IN_PROGRESS message received%n", LocalDateTime.now());
            return;
        }
        if (message.getMqOperationCode() == MqOperationCode.WORK_ENDED){
            System.out.printf("%s [api] WORK_ENDED message received%n", LocalDateTime.now());
            return;
        }

        System.out.printf("%s [api] message received : \n%s%n", LocalDateTime.now(), message);
    }

并在同一迭代中输出

2024-12-19T13:03:53.141036336 [api] WORK_STARTED message received
2024-12-19T13:03:53.146047337 [api] IN_PROGRESS message received
2024-12-19T13:03:53.150053390 [api] IN_PROGRESS message received
2024-12-19T13:03:53.153668788 [api] IN_PROGRESS message received
2024-12-19T13:03:53.157834576 [api] IN_PROGRESS message received
2024-12-19T13:03:53.161060836 [api] IN_PROGRESS message received
2024-12-19T13:03:53.163221293 [api] IN_PROGRESS message received
2024-12-19T13:03:53.165163888 [api] IN_PROGRESS message received
2024-12-19T13:03:53.166759323 [api] IN_PROGRESS message received
2024-12-19T13:03:53.168365120 [api] IN_PROGRESS message received
2024-12-19T13:03:53.169784977 [api] WORK_ENDED message received

如您所见,工作客户端一次性发送所有消息,但我需要发送每条消息而不等待事务结束。

java jms activemq-classic activemq-artemis
1个回答
0
投票

如果您想立即发送 JMS 消息而不等待事务结束,请尝试这样做:

  1. 在 jmsTemplate 中禁用事务:
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
// Disable transaction for the JMS session
jmsTemplate.setSessionTransacted(false);
// Optional: Use non-persistent delivery for speed
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  1. 如果 ConnectionFactory 是事务性的,它可以覆盖您的设置。将其添加到您的 ConnectionFactory 中:
// Send messages asynchronously to avoid delays
connectionFactory.setUseAsyncSend(true); 
  1. 检查发送消息的方法是否不是事务性
  2. 为了确保确认模式设置为非事务发送,请修改 myFactory bean:
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
  1. 您还可以检查 ActiveMQ Artemis 代理是否没有引入延迟或缓冲消息:在 Artemis Broker.xml 配置中的事务行为启用立即交付和 sendAcksAsync,如下所示:
<address-settings>
    <address-setting match="#">
        <immediate-delivery>true</immediate-delivery>
        <send-acks-async>true</send-acks-async>
    </address-setting>
</address-settings>
  1. 还有一件事可以提供帮助,那就是启用更多日志记录:
logging.level.org.springframework.jms=DEBUG

希望对您有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.