多个消费者对 Oracle AQ 中逻辑分组的消息进行顺序处理

问题描述 投票:0回答:1

我有一个使用 Oracle AQ 的 spring-boot 服务。该服务接收 http 请求,将请求主体作为消息排队到 Oracle AQ 中,然后将其出队并通过 http 发送到目标服务,因此队列在这里就像一个缓冲区。

每条消息都包含一个字段,通过该字段我们可以了解它们是相互关联的。目标是消息入队的顺序应在组内保留,而不同组的消息可以并行处理。

所以,例如队列中有 5 条消息:A1-A2-A3-A4-A5。 A1和A3是相互关联的,所以我们必须确保消息A1会在A3之前被处理(不仅被监听,而且被处理)。因此,为了保留应用程序单个实例的组内顺序,我使用选择器:

制作人:

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    private final JmsTemplate jmsTemplate;

    public MessageProducer(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void sendMessage(RequestDto requestDto) {
        jmsTemplate.convertAndSend(
                "test_queue",
                requestDto,
                message -> {
                    message.setStringProperty("requestDtoGroup", requestDto.getGroup());
                    return message;
                }
        );
    }
}

消费者:


import jakarta.jms.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
@Log4j2
public class MessageConsumer {

    @JmsListener(
            destination = "test_queue",
            selector = "requestDtoGroup = 'group_1'"
    )
    public void listenGroup1(Message message) {
       log.info("Message with group_1" + message);
       // further processing
    }

    @JmsListener(
            destination = "test_queue",
            selector = "requestDtoGroup = 'group_2'"
    )
    public void listenGroup2(Message message) {
       log.info("Message with group_2" + message);
       // further processing
    }
}

这是我的 JMS 配置:

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

import java.time.Duration;
import java.util.Properties;

@Configuration
@EnableJms
public class JmsConfiguration {

    @Bean
    public ConnectionFactory connectionFactory() throws JMSException {
        Properties properties = new Properties();
        properties.setProperty("user", "AQ_USER");
        properties.setProperty("password", "your_password");

        return AQjmsFactory.getQueueConnectionFactory(
                "jdbc:oracle:thin:@localhost:1521/ORCLPDB1",
                properties
        );
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SomeHandler someHandler) throws JMSException {
        var containerFactory = new DefaultJmsListenerContainerFactory();

        containerFactory.setConnectionFactory(connectionFactory());

        containerFactory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        containerFactory.setSessionTransacted(true);

        return containerFactory;
    }
}

对于不同的选择器,消息是并行处理的,并且按预期工作。

但是如果应用程序的 2 个或更多实例正在运行,我就无法实现它。

如果我并行启动应用程序的另一个实例(即我使用复制粘贴的

MessageConsumer
类在本地启动另一个应用程序),则具有相同
requestDtoGroup
的消息(A1 和 A3)将被并行处理,我想避免。

即使有 2 个或更多应用程序实例,如何实现具有相同

requestDtoGroup
的消息按顺序处理?

到目前为止我尝试过的:

  1. 在 SO 的一些答案中,我遇到了查看消息分组的建议,但据我了解,它不适合我,因为 Oracle AQ 在同一事务中对消息进行分组,而我的消息是按逻辑分组的。

  2. 但我仍然尝试通过发送将

    requestDtoGroup
    值设置为
    correlation ID
    JMSXGroupID
    通过
    jmsTemplate.convertAndSend
    留言,但没有影响任何事情。

  3. 我还发现了这个问题,建议使用出队选项通过使消息的最旧的入队时间出队来进行搜索,但我不知道是否可以以某种方式将其设置为侦听器.

实际上,我使用 Kafka 实现了所需的行为,其中我使用单独的分区对主题内的消息进行分组,但我必须使用 Oracle AQ。也许看看 Oracle Transactional Event Queues 是有意义的,他们说它类似于 Kafka,但我会首先尝试使用简单的 AQ 来实现。

java spring oracle spring-boot oracle-aq
1个回答
0
投票

老实说,我认为你不应该采用这样的解决方案。

如果您关心可扩展性,那么您必须配置应用程序实例来处理不同的消息组。相反,尝试在组内分配消息处理。

但是,如果你想这样做:

选项1

您必须编写一个自定义组件,在缓存的帮助下跟踪组中的消息处理。仅当当前处理的组中没有消息时,消费者才会继续处理。如果消费者选择了消息或完成了消息的处理,它必须使用组的处理状态更新缓存。

由于跟踪消息的开销,不确定这将有多大的可扩展性。

选项2

在生产者中,尝试根据某些消息属性或逻辑进一步对消息进行分区。然后,使用消费者实例上的不同选择器动态配置消费者。但同样,您还必须进行一些跟踪:( 就像选项 1 中那样。

您可以使用 Kafka 实现相同的效果,因为您应用了额外的分区。然而,在 AQ 中,您可能必须在程序级别执行相同的操作。

© www.soinside.com 2019 - 2024. All rights reserved.