我刚刚开始考虑如何在 Spring 中工作。到目前为止,我的消费者工作得很好,除了当我不确认消息时,它仍然从队列中取出(我希望它留在那里或以死信队列结束)。
JMS Acknowledgements
在 ConsumerClass 中,我的简单消费者看起来像这样:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="failover://(tcp://jms1:61616,tcp://jms2:61616)?randomize=false&jms.redeliveryPolicy.maximumRedeliveries=5" />
<!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="com.company.ConsumerClass" />
<!-- A JMS namespace aware Spring configuration for the message listener container -->
<jms:listener-container
container-type="default"
connection-factory="connectionFactory"
acknowledge="client"
concurrency="10-50"
cache="consumer">
<jms:listener destination="someQueue" ref="simpleMessageListener" method="onMessage" />
</jms:listener-container>
</beans>
@Override public final void onMessage(Message message) {
Object postedMessage = null;
try {
postedMessage = ((ObjectMessage) message).getObject();
if (postedMessage.getClass() == SomeMessageType.class) {
try {
//Some logic here
message.acknowledge();
return; //Success Here
} catch (MyException e) {
logger.error("Could not process message, but as I didn't call acknowledge I expect it to end up in the dead message queue");
}
}
} catch (JMSException e) {
logger.error("Error occurred pulling Message from Queue", e);
}
//Also worth noting, if I throw new RuntimeException("Aww Noos"); here then it won't take it from the queue, but it won't get consumed (or end up as dead letter)...
}
侦听器容器提供以下消息确认选项:
“sessionAcknowledgeMode”设置为“AUTO_ACKNOWLEDGE”(默认):监听器执行前自动消息确认;抛出异常时不会重新投递。
“sessionAcknowledgeMode”设置为“CLIENT_ACKNOWLEDGE”:监听器执行成功后自动消息确认;抛出异常时不会重新投递。“sessionAcknowledgeMode”设置为“DUPS_OK_ACKNOWLEDGE”:侦听器执行期间或之后延迟消息确认;如果抛出异常,可能会重新交付。
“sessionTransacted”设置为“true”:侦听器成功执行后的事务确认;在抛出异常的情况下保证重新交付。
找到了答案 如果您更改acknowledge =“transacted”并确保抛出new RuntimeException(“消息无法使用。回滚事务”),它看起来效果很好。在 OnMessage() 例程的末尾。
仍然不知道acknowledge =“client”实现了什么
JVM
在您的情况下,从 JMS 服务器的角度来看,客户端似乎请求了消息但从未确认,因此它仍然由客户端“正在处理”。在这种情况下,消息对于同一队列上的其他消费者来说是不可见的,因此您可能会觉得消息已“从队列中取出”,而事实上它仍然存在。显然你也不会在死消息队列中看到这样的消息。
我建议您阅读JMS规范以清楚地了解不同的确认模式。