在带有 JMS 的 ActiveMQ 的 Amazon MQ 中配置最大并发消息处理

问题描述 投票:0回答:1

我试图了解如何使用 ActiveMQ 引擎和 JMS 协议将 Amazon MQ 集成到我的应用程序中。主要目标是减慢某些进程的速度并在队列中按顺序执行它们。

我一直在探索 JMS 集成,因为它简化了侦听器的实现,以便在发送者将消息推送到队列时从队列异步接收消息。但是,我不清楚这些消息是否被一一消耗。我要求,对于每个消费者,一次只处理一条消息。这样,我可以跟踪整个节点集群在特定时刻正在处理的最大消息数,确保每个节点仅处理一条消息。

或者,至少,我需要一种方法来设置消费者同时处理的最大消息数。

这是我在每个节点上部署的消费者的代码示例:

@Singleton
public class MmfgActiveMqConsumer {

    Logger LOG = LogManager.getLogger(MmfgActiveMqConsumer.class);

    // Specify the connection parameters.
    private final static String WIRE_LEVEL_ENDPOINT
            = "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617";
    private final static String ACTIVE_MQ_USERNAME = "MyUsername123";
    private final static String ACTIVE_MQ_PASSWORD = "MyPassword456";

    @PostConstruct
    public void init() {
        // Start to listen to ActiveMq Queue
        listenToQueue();
    }

    public void listenToQueue() {
        // Configure connection and session

        new Thread(() -> {
            try {
                final ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory();
                receiveMessage(connectionFactory);
            } catch (Exception e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }

    private static void receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException {
        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        final Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        final Session consumerSession = consumerConnection
                .createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create a queue named "MyQueue".
        final Destination consumerDestination = consumerSession
                .createQueue("MyQueue");

        // Create a message consumer from the session to the queue.
        final MessageConsumer consumer = consumerSession
                .createConsumer(consumerDestination);

        consumer.setMessageListener(new ActiveMqMessageListener());

        // Thread sleep logic
        //TODO
        // Clean up the consumer.
        consumer.close();
        consumerSession.close();

    }

    private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
        // Create a connection factory.
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);

        // Pass the sign-in credentials.
        connectionFactory.setUserName(ACTIVE_MQ_USERNAME);
        connectionFactory.setPassword(ACTIVE_MQ_PASSWORD);
        return connectionFactory;
    }
}

这是在消费者内部设置的监听器:

public class ActiveMqMessageListener implements MessageListener {

    private Logger LOG = LogManager.getLogger(ActiveMqMessageListener.class);

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage textMessage) {
            try {
                String payload = textMessage.getText();
                LOG.info("Message receive from ActiveMq Queue: " + payload);
            } catch (JMSException e) {
                LOG.error("Error receiving message from ActiveMq Queue: " + e);
            }
        }
    }
}

每次将消息推送到队列时,都会调用

onMessage()
方法一次,并且下一条消息将在队列中等待,直到该方法执行完毕,这是否正确?

您建议如何实现此目标,或者 Amazon MQ 或 ActiveMQ 中是否有特定配置允许设置 JMS 使用者同时处理的消息数量的最大限制?任何见解或代码示例将不胜感激。

java spring-mvc jms activemq amazon-mq
1个回答
0
投票

是的,onMessage()方法被一次调用一个。

如果您想要严格的每消费者 1 条消息设计,您将需要使用以下几个功能:

  1. ActiveMQ.INDIVIDUAL_ACKNOWLEDGE 确认模式。当你在 onMessage() 中处理完成后,你就可以调用 message.acknowledge()

     public void onMessage(Message message) {
        ... do things ..
        message.acknowledge()
     }
    
  2. 将预取设置为 0 或 1

每个消费者 1 条消息的常见用例是长时间运行的任务或工业应用程序。对于一般的企业数据处理来说,这不是一个实用的设计模式。 !!!请记住,由于等待来回通信的时间,这种设计“大大”降低了消耗性能。 ActiveMQ 的一大优势是它能够使用预取,从而克服了这种延迟。

考虑设计的另一种方式是每个消费者可能负责一批消息。例如,1,000。或 100。无论批量大小

适用于您的应用程序 - 使用它作为预取以保持消息移动。

© www.soinside.com 2019 - 2024. All rights reserved.