我试图了解如何使用 ActiveMQ 引擎和 JMS 协议将 Amazon MQ 集成到我的应用程序中。主要目标是减慢某些进程的速度并在队列中按顺序执行它们。
我一直在探索 JMS 集成,因为它简化了侦听器的实现,以便在发送者将消息推送到队列时从队列异步接收消息。但是,我不清楚这些消息是否被一一消耗。我要求,对于每个消费者,一次只处理一条消息。这样,我可以跟踪整个节点集群在特定时刻正在处理的最大消息数,确保每个节点仅处理一条消息。
或者,至少,我需要一种方法来设置消费者同时处理的最大消息数。
这是我在每个节点上部署的消费者的代码示例:
@Singleton
public class MmfgActiveMqConsumer {
Logger LOG = LogManager.getLogger(MmfgActiveMqConsumer.class);
// Specify the connection parameters.
private final static String WIRE_LEVEL_ENDPOINT
= "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617";
private final static String ACTIVE_MQ_USERNAME = "MyUsername123";
private final static String ACTIVE_MQ_PASSWORD = "MyPassword456";
@PostConstruct
public void init() {
// Start to listen to ActiveMq Queue
listenToQueue();
}
public void listenToQueue() {
// Configure connection and session
new Thread(() -> {
try {
final ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory();
receiveMessage(connectionFactory);
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}).start();
}
private static void receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException {
// Establish a connection for the consumer.
// Note: Consumers should not use PooledConnectionFactory.
final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession = consumerConnection
.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a queue named "MyQueue".
final Destination consumerDestination = consumerSession
.createQueue("MyQueue");
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession
.createConsumer(consumerDestination);
consumer.setMessageListener(new ActiveMqMessageListener());
// Thread sleep logic
//TODO
// Clean up the consumer.
consumer.close();
consumerSession.close();
}
private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
// Create a connection factory.
final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);
// Pass the sign-in credentials.
connectionFactory.setUserName(ACTIVE_MQ_USERNAME);
connectionFactory.setPassword(ACTIVE_MQ_PASSWORD);
return connectionFactory;
}
}
这是在消费者内部设置的监听器:
public class ActiveMqMessageListener implements MessageListener {
private Logger LOG = LogManager.getLogger(ActiveMqMessageListener.class);
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
String payload = textMessage.getText();
LOG.info("Message receive from ActiveMq Queue: " + payload);
} catch (JMSException e) {
LOG.error("Error receiving message from ActiveMq Queue: " + e);
}
}
}
}
每次将消息推送到队列时,都会调用
onMessage()
方法一次,并且下一条消息将在队列中等待,直到该方法执行完毕,这是否正确?
您建议如何实现此目标,或者 Amazon MQ 或 ActiveMQ 中是否有特定配置允许设置 JMS 使用者同时处理的消息数量的最大限制?任何见解或代码示例将不胜感激。
是的,onMessage()方法被一次调用一个。
如果您想要严格的每消费者 1 条消息设计,您将需要使用以下几个功能:
ActiveMQ.INDIVIDUAL_ACKNOWLEDGE 确认模式。当你在 onMessage() 中处理完成后,你就可以调用 message.acknowledge()
public void onMessage(Message message) {
... do things ..
message.acknowledge()
}
将预取设置为 0 或 1
每个消费者 1 条消息的常见用例是长时间运行的任务或工业应用程序。对于一般的企业数据处理来说,这不是一个实用的设计模式。 !!!请记住,由于等待来回通信的时间,这种设计“大大”降低了消耗性能。 ActiveMQ 的一大优势是它能够使用预取,从而克服了这种延迟。
考虑设计的另一种方式是每个消费者可能负责一批消息。例如,1,000。或 100。无论批量大小
适用于您的应用程序 - 使用它作为预取以保持消息移动。