我目前使用的是 Spring Boot 版本 3.3.0 和 Java 21。
我在将
SessionAcknowledgeMode
设置为 Session.CLIENT_ACKNOWLEDGE
时遇到问题。我正在尝试在成功处理后手动确认消息。因此,为了实现这一目标,我尝试将 SessionAcknowledgeMode
设置为 Session.CLIENT_ACKNOWLEDGE
但它不起作用。我的监听器正在自动确认消息。尽管尝试了多种方法,但它并没有达到预期的效果。任何指导或解决方案将不胜感激!
pom.xml 文件:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
我的听众:
@JmsListener(destination = "test.queue", containerFactory = "jmsListenerContainerFactory")
public void configure(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
log.info("Received message {} ", textMessage.getText());
// Commenting out the acknowledgement
//message.acknowledge();
} catch (Exception e) {
log.error("Message processing failed.");
}
}
我已经在代码中注释掉了
message.acknowledge();
,但是消息仍然从ActiveMQ中删除。
这是我的配置:
@Bean
JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setSessionTransacted(false);
return factory;
}
当我使用手动代码片段进行测试时,它的行为符合预期 - 如果我不确认消息,则不会从队列中删除消息。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("test", "test",
"tcp://localhost:61616");
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
Queue queue = session.createQueue("bharad.test");
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
Message message = consumer.receive(1000);
if (message != null) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
message.acknowledge();
} else {
System.out.println("No messages in queue");
break;
}
}
}
}
答案实际上写在 SimpleJmsListenerContainer 的文档中:
请注意,此容器公开了默认“AUTO_ACKNOWLEDGE”模式的标准 JMS 行为:即,侦听器执行后自动消息确认,在抛出用户异常的情况下不会重新传递,但在侦听器执行期间 JVM 死亡的情况下可能会重新传递
对于不同风格的 MessageListener 处理,通过循环的 MessageConsumer.receive() 调用也允许事务性接收消息(使用 XA 事务注册它们),请参阅 DefaultMessageListenerContainer。
因此它实际上只提供 AUTO_ACKNOWLEDGE 行为,并且不可配置其他行为。
如果你想配置其他行为,请使用 DefaultMessageListenerContainer