我正在使用 Spring JMS 和 ActiveMQ 使用 JMS 主题(发布/订阅)将消息从发送者发送到多个侦听器。至此所有监听者都可以接收到来自发送者的消息。但我想添加一个功能,当特定侦听器(例如侦听器 1)收到消息时,侦听器 1 将向发送者发送收据确认。我按照我的旧帖子中的评论,在发送者中创建了一个
TemporaryQueue
,并在发送者和接收者中使用ReplyTo来获取从侦听器到发送者的确认消息。
我的发件人类别是:
public class CustomerStatusSender {
private JmsTemplate jmsTemplate;
private Topic topic;
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setTopic(Topic topic) {
this.topic = topic;
}
public void simpleSend(final String customerStatusMessage) {
jmsTemplate.send(topic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello world");
message.setStringProperty("content", customerStatusMessage);
message.setIntProperty("count", 10);
//send acknowledge request to a listener via a tempQueue
Destination tempQueue = session.createTemporaryQueue();
message.setJMSCorrelationID("replyMessage");
message.setJMSReplyTo(tempQueue);
return message;
}
});
}
}
发送者创建一个
TemporaryQueue
供监听者发回确认消息。然后在其中一个监听器中,我有以下代码将确认消息发送回发送者:
public class CustomerStatusListener implements SessionAwareMessageListener<Message> {
public void onMessage(Message message, Session session) {
if (message instanceof TextMessage) {
try {
System.out.println("Subscriber 1 got you! The message is: "
+ message.getStringProperty("content"));
//create a receipt confirmation message and send it back to the sender
Message response = session.createMessage();
response.setJMSCorrelationID(message.getJMSCorrelationID());
response.setBooleanProperty("Ack", true);
TemporaryQueue tempQueue = (TemporaryQueue) message.getJMSReplyTo();
MessageProducer producer = session.createProducer(tempQueue);
producer.send(tempQueue, response);
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
} else {
throw new IllegalArgumentException(
"Message must be of type TextMessage");
}
}
}
但是,我发现 Listener 类中的以下行抛出错误:
TemporaryQueue tempQueue = (TemporaryQueue) message.getJMSReplyTo();
MessageProducer producer = session.createProducer(tempQueue);
异常错误说:
The destination temp-queue://ID:xyz-1385491-1:2:1 does not exist.
那么这里出了什么问题呢?我假设发送者创建的
tempQueue
可用于同一 JMS 会话中的侦听器。为什么调用 tempQueue
后的 message.getJMSReplyTo()
对象没有返回有效的 TemporaryQueue
?
另一个问题是:如何在发送方收到确认消息?我是否应该在发送方中实现
MessageListener
接口才能接收侦听器的确认?或者我应该调用 receive()
方法来同步接收它?
感谢您的建议!
如果您使用 spring-jms,为什么不直接使用 MessageListenerAdapter 作为监听器呢? - 他会处理replyTo的事情,你的听众可以是一个简单的POJO。
无论如何,你不需要将其强制转换到临时队列;就听众而言,这只是一个目的地。
查看 MessageListenerAdapter` javadocs。
此外,您需要在发送端的临时队列上创建一个消费者,以接收回复。如果发送连接关闭,临时队列将消失。
我最终使用单独的 JMS 队列将确认消息从监听器 1 发送到发送者。由于某种原因,ActiveMQ 创建的临时队列在 JMS 会话期间不可用。