我有一个使用 Oracle AQ 的 spring-boot 服务。该服务接收 http 请求,将请求主体作为消息排队到 Oracle AQ 中,然后将其出队并通过 http 发送到目标服务,因此队列在这里就像一个缓冲区。
每条消息都包含一个字段,通过该字段我们可以了解它们是相互关联的。目标是消息入队的顺序应在组内保留,而不同组的消息可以并行处理。
所以,例如队列中有 5 条消息:A1-A2-A3-A4-A5。 A1和A3是相互关联的,所以我们必须确保消息A1会在A3之前被处理(不仅被监听,而且被处理)。因此,为了保留应用程序单个实例的组内顺序,我使用选择器:
制作人:
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
private final JmsTemplate jmsTemplate;
public MessageProducer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage(RequestDto requestDto) {
jmsTemplate.convertAndSend(
"test_queue",
requestDto,
message -> {
message.setStringProperty("requestDtoGroup", requestDto.getGroup());
return message;
}
);
}
}
消费者:
import jakarta.jms.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class MessageConsumer {
@JmsListener(
destination = "test_queue",
selector = "requestDtoGroup = 'group_1'"
)
public void listenGroup1(Message message) {
log.info("Message with group_1" + message);
// further processing
}
@JmsListener(
destination = "test_queue",
selector = "requestDtoGroup = 'group_2'"
)
public void listenGroup2(Message message) {
log.info("Message with group_2" + message);
// further processing
}
}
这是我的 JMS 配置:
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import java.time.Duration;
import java.util.Properties;
@Configuration
@EnableJms
public class JmsConfiguration {
@Bean
public ConnectionFactory connectionFactory() throws JMSException {
Properties properties = new Properties();
properties.setProperty("user", "AQ_USER");
properties.setProperty("password", "your_password");
return AQjmsFactory.getQueueConnectionFactory(
"jdbc:oracle:thin:@localhost:1521/ORCLPDB1",
properties
);
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SomeHandler someHandler) throws JMSException {
var containerFactory = new DefaultJmsListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory());
containerFactory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
containerFactory.setSessionTransacted(true);
return containerFactory;
}
}
对于不同的选择器,消息是并行处理的,并且按预期工作。
但是如果应用程序的 2 个或更多实例正在运行,我就无法实现它。
如果我并行启动应用程序的另一个实例(即我使用复制粘贴的
MessageConsumer
类在本地启动另一个应用程序),则具有相同 requestDtoGroup
的消息(A1 和 A3)将被并行处理,我想避免。
即使有 2 个或更多应用程序实例,如何实现具有相同
requestDtoGroup
的消息按顺序处理?
到目前为止我尝试过的:
在 SO 的一些答案中,我遇到了查看消息分组的建议,但据我了解,它不适合我,因为 Oracle AQ 在同一事务中对消息进行分组,而我的消息是按逻辑分组的。
但我仍然尝试通过发送将
requestDtoGroup
值设置为 correlation ID
或 JMSXGroupID
通过jmsTemplate.convertAndSend
留言,但没有影响任何事情。
我还发现了这个问题,建议使用出队选项通过使消息的最旧的入队时间出队来进行搜索,但我不知道是否可以以某种方式将其设置为侦听器.
实际上,我使用 Kafka 实现了所需的行为,其中我使用单独的分区对主题内的消息进行分组,但我必须使用 Oracle AQ。也许看看 Oracle Transactional Event Queues 是有意义的,他们说它类似于 Kafka,但我会首先尝试使用简单的 AQ 来实现。
老实说,我认为你不应该采用这样的解决方案。
如果您关心可扩展性,那么您必须配置应用程序实例来处理不同的消息组。相反,尝试在组内分配消息处理。
但是,如果你想这样做:
选项1
您必须编写一个自定义组件,在缓存的帮助下跟踪组中的消息处理。仅当当前处理的组中没有消息时,消费者才会继续处理。如果消费者选择了消息或完成了消息的处理,它必须使用组的处理状态更新缓存。
由于跟踪消息的开销,不确定这将有多大的可扩展性。
选项2
在生产者中,尝试根据某些消息属性或逻辑进一步对消息进行分区。然后,使用消费者实例上的不同选择器动态配置消费者。但同样,您还必须进行一些跟踪:( 就像选项 1 中那样。
您可以使用 Kafka 实现相同的效果,因为您应用了额外的分区。然而,在 AQ 中,您可能必须在程序级别执行相同的操作。