我需要发送一些消息而不等待交易结束。 但我尝试过的所有设置都喜欢
template.setSessionTransacted(false);
并且设置不同的确认模式并没有带来任何结果。
我使用的是 jms 库。 现在它使用所有设置默认值,但我尝试使用所有知识模块启用结束禁用事务模块
implementation 'org.apache.activemq:artemis-jakarta-client:2.38.0'
implementation 'org.apache.activemq:artemis-core-client:2.38.0'
implementation 'org.apache.activemq:artemis-commons:2.38.0'
implementation 'org.springframework:spring-jms:6.1.14'
我的 JMS 配置
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(format("tcp://%s:%s", host, port));
connectionFactory.setUser(user);
connectionFactory.setPassword(pass);
return connectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
Logger logger = (Logger) LoggerFactory.getLogger("org.springframework.jms");
logger.setLevel(Level.ERROR);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(new CustomErrorHandler());
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsTemplate jmsAnycastTemplate(ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
Worker 客户端代码
public static void doAllWork(int moduleId, MqProducer mqProducer, MqMessage message){
mqProducer.sendAcceptToWork(moduleId, message);
Work.doSomeWork(moduleId, mqProducer);
mqProducer.sendEndWork(moduleId, message, 200L);
}
@SneakyThrows
public static void doSomeWork(int moduleId, MqProducer mqProducer){
int secToWork = ThreadLocalRandom.current().nextInt(1, 10);
LocalDateTime workstarted = LocalDateTime.now();
while (Duration.between(workstarted, LocalDateTime.now()).getSeconds() < secToWork) {
Thread.sleep(1000);
System.out.printf("%s [api] WORK_ENDED message received%n", LocalDateTime.now());
mqProducer.sendMessage("api", new MqMessage(IN_PROGRESS, moduleId, 200));
}
}
并且有来自工作客户端的输出
sendAcceptToWork 3 0
2024-12-19T13:03:45.046627435 Working...
2024-12-19T13:03:46.057809869 Working...
2024-12-19T13:03:47.065647593 Working...
2024-12-19T13:03:48.074799444 Working...
2024-12-19T13:03:49.080706764 Working...
2024-12-19T13:03:50.088089181 Working...
2024-12-19T13:03:51.095799844 Working...
2024-12-19T13:03:52.104492990 Working...
2024-12-19T13:03:53.113162971 Working...
Work done
有代码api客户端
@SneakyThrows
@Override
public void run(MqMessage message) {
if (message.getMqOperationCode() == MqOperationCode.WORK_STARTED){
System.out.println();
System.out.printf("%s [api] WORK_STARTED message received%n", LocalDateTime.now());
return;
}
if (message.getMqOperationCode() == MqOperationCode.IN_PROGRESS){
System.out.printf("%s [api] IN_PROGRESS message received%n", LocalDateTime.now());
return;
}
if (message.getMqOperationCode() == MqOperationCode.WORK_ENDED){
System.out.printf("%s [api] WORK_ENDED message received%n", LocalDateTime.now());
return;
}
System.out.printf("%s [api] message received : \n%s%n", LocalDateTime.now(), message);
}
并在同一迭代中输出
2024-12-19T13:03:53.141036336 [api] WORK_STARTED message received
2024-12-19T13:03:53.146047337 [api] IN_PROGRESS message received
2024-12-19T13:03:53.150053390 [api] IN_PROGRESS message received
2024-12-19T13:03:53.153668788 [api] IN_PROGRESS message received
2024-12-19T13:03:53.157834576 [api] IN_PROGRESS message received
2024-12-19T13:03:53.161060836 [api] IN_PROGRESS message received
2024-12-19T13:03:53.163221293 [api] IN_PROGRESS message received
2024-12-19T13:03:53.165163888 [api] IN_PROGRESS message received
2024-12-19T13:03:53.166759323 [api] IN_PROGRESS message received
2024-12-19T13:03:53.168365120 [api] IN_PROGRESS message received
2024-12-19T13:03:53.169784977 [api] WORK_ENDED message received
如您所见,工作客户端一次性发送所有消息,但我需要发送每条消息而不等待事务结束。
如果您想立即发送 JMS 消息而不等待事务结束,请尝试这样做:
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
// Disable transaction for the JMS session
jmsTemplate.setSessionTransacted(false);
// Optional: Use non-persistent delivery for speed
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Send messages asynchronously to avoid delays
connectionFactory.setUseAsyncSend(true);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
<address-settings>
<address-setting match="#">
<immediate-delivery>true</immediate-delivery>
<send-acks-async>true</send-acks-async>
</address-setting>
</address-settings>
logging.level.org.springframework.jms=DEBUG
希望对您有帮助。