RabbitMQ 超级流 |使用 RabbitMQ 的 Spring Cloud Stream 消费者组

问题描述 投票:0回答:1

我有 2 个服务(服务 A 和服务 B)需要使用相同的数据。我可以通过使用队列类型作为流来实现这一点。但同时,在自动缩放期间,我不希望服务 A1 和服务 A2(2 个实例)消耗相同的数据。

我发现通过实现超级流我可以实现这一点,但我没有找到任何好的例子来解决并且使用 Spring Cloud Stream 也是可能的,但再次无法找到正确的资源。

任何人都可以为我提供一个好的资源或示例来使用rabbitmq实现此目的吗?

spring-boot rabbitmq message-queue spring-cloud-stream rabbitmq-stream
1个回答
0
投票

我不知道任何 spring-cloud-stream 示例,但一般来说,SCSt 应用程序对底层传输是不可知的(这是其目标之一)。

但是,Spring AMQP 项目文档有如何使用 spring-rabbit-stream 库的示例(适用于普通流和超级流)。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#stream-support

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}

(您需要阅读 Spring AMQP 文档的其余部分以了解侦听器容器是什么)。

© www.soinside.com 2019 - 2024. All rights reserved.