我有一个设置,我必须从 ActiveMQ 代理中的队列中读取消息。读取消息后,我必须对该消息执行长时间运行的操作。
由于对消息的长时间运行操作,我想尽快确认该消息,以便释放代理上的资源。计划是在收到消息后执行以下步骤:
我已经阅读了有关 JMS 和不同确认模式的信息,因此在尝试这样做之前,我决定建立一个应用程序,在其中我可以尝试不同的模式以了解它们的处理方式,不幸的是,我似乎无法获得所需的输出.
遵循此答案中的信息https://stackoverflow.com/a/10188078以及https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/ listener/DefaultMessageListenerContainer.html 我认为通过使用 AUTO_ACKNOWLEDGE 消息将在调用侦听器之前得到确认,但是如果我在侦听器中抛出异常,消息将被重新传递。
我尝试过将 setSessionTransacted 设置为 true 和不设置 setSessionTransacted ,但在这两种情况下我都得到相同的输出。当 JmsListener 中抛出异常时,消息将被重新传递。
JMS的配置
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(jmsConfig.getBrokerUrl());
return connectionFactory;
}
@Bean
public JmsTemplate jmstemplate(){
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory());
//jmsTemplate.setSessionTransacted(true);
jmsTemplate.setDefaultDestinationName( jmsConfig.getQueueIn() );
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactoryxxxx(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
//factory.setConcurrency("1");
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
return factory;
}
JmsListener
@JmsListener(destination = "B1Q1", containerFactory = "jmsListenerContainerFactoryxxxx")
public void receiveMessage(Message message) {
try {
TextMessage m = (TextMessage) message;
String messageText = m.getText();
int retryNum = message.getIntProperty("JMSXDeliveryCount");
long s = message.getLongProperty("JMSTimestamp");
Date d = new Date( s );
String dbText = String.format("Retry %d. Message: %s", retryNum, messageText);
if ( messageText.toLowerCase().contains("exception") ) {
logger.info("Creating exception for retry: {}", retryNum);
throw new RuntimeException();
}
} catch (JMSException e) {
logger.error("Exception!!", e);
}
}
我应该如何更改代码,以便在抛出异常时不重新发送消息?
回到我的应用程序,我将在其中将消息插入数据库。在消息插入数据库之后、执行长时间运行的任务之前,如何通过 JmsListener 确认消息?
为了能够使用
AUTO_ACKNOWLEDGE
或CLIENT_ACKNOWLEDGE
,我必须在配置工厂后调用factory.setSessionTransacted(false)
。
调用
configurer.configure(factory, connectionFactory)
会覆盖 sessionTransacted
的值,在我的例子中,它将其设置为 true
,这使得 AUTO_ACKNOWLEDGE
或 CLIENT_ACKNOWLEDGE
无效。这是DefaultJmsListenerContainerFactoryConfigurer.java
的相关代码:
public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
...
...
if (this.transactionManager != null) {
factory.setTransactionManager(this.transactionManager);
} else {
factory.setSessionTransacted(true);
}
...
...
factory.setSessionAcknowledgeMode(Tibjms.EXPLICIT_CLIENT_ACKNOWLEDGE);
//factory.setSessionTransacted(false);// here it’s not working
factory.setTaskExecutor(new SimpleAsyncTaskExecutor("KDBMessageListener-"));
configurer.configure(factory, connectionFactory);
factory.setSessionTransacted(false); //post configure ,session transacted is working
我尝试过这个配置。
@Bean
JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setSessionTransacted(false);
configurer.configure(factory, connectionFactory);
return factory;
}
没成功。