我有 2 个服务(服务 A 和服务 B)需要使用相同的数据。我可以通过使用队列类型作为流来实现这一点。但同时,在自动缩放期间,我不希望服务 A1 和服务 A2(2 个实例)消耗相同的数据。
我发现通过实现超级流我可以实现这一点,但我没有找到任何好的例子来解决并且使用 Spring Cloud Stream 也是可能的,但再次无法找到正确的资源。
任何人都可以为我提供一个好的资源或示例来使用rabbitmq实现此目的吗?
我不知道任何 spring-cloud-stream 示例,但一般来说,SCSt 应用程序对底层传输是不可知的(这是其目标之一)。
但是,Spring AMQP 项目文档有如何使用 spring-rabbit-stream 库的示例(适用于普通流和超级流)。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#stream-support
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
(您需要阅读 Spring AMQP 文档的其余部分以了解侦听器容器是什么)。