来自单个消费者的 Spring Cloud Stream 多集群和多输入绑定

问题描述 投票:0回答:1

在我的项目中,我需要连接到两个不同的 Kafka 代理并消费两个主题的所有事件,每个 Kafka 代理上有一个主题。

我的

application.yaml
看起来有点像这样:

spring:
    cloud:
    function:
      definition: orderCreatedListener;orderProcessedListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two # I changed this binder between kafka-one and kafka-two manually for tests, the orderProcessedListener-in-1 binding doesn't have the exclusive producer
        orderProcessedListener-in-0: # CONSUME FROM KAFKA ONE
          destination: order-processed
          group: spot
          binder: kafka-one
        orderProcessedListener-in-1: # CONSUMER FROM KAFKA TWO
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
          configuration:
            security:
              protocol: SASL_PLAINTEXT
            sasl:
              mechanism: PLAIN
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-1:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"

但是当我运行应用程序时它不起作用,Kafka One 的消费者绑定正常工作

orderProcessedListener-in-0
,但是 Kafka Two 的另一个消费者绑定不起作用
orderProcessedListener-in-1

我正在使用:

  • Spring Boot 3.3.0
  • 春云2023.0.1

我的两个 Kafka 集群在带有 docker 容器的开发环境中运行良好,一个暴露在 9092 端口上,另一个暴露在 9093 端口上。

架构示例: project architecture example

Kafka 与所有主题的所有签名消费者合一: enter image description here

Kafka 2 在所有主题上都没有从未签署过的消费者: enter image description here

我们该如何调整?

spring-boot apache-kafka spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

解决此问题的简单方法是使用两个指向唯一入口点方法的 bean。

示例:


@Component
public class OrderProcessedListener {

    public void consume(final Message<Order> message) {
        // your business logic to process message
    }

    @Bean
    public Consumer<Message<Order>> orderProcessedFromKafkaOneListener() {
        return this::consume;
    }

    @Bean
    public Consumer<Message<Order>> orderProcessedFromKafkaTwoListener() {
        return this::consume;
    }

}

属性配置应该是:

spring:
    cloud:
    function:
      definition: orderCreatedListener;orderProcessedFromKafkaOneListener;orderProcessedFromKafkaTwoListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two
        orderProcessedFromKafkaOneListener-in-0:
          destination: order-processed
          group: spot
          binder: kafka-one
        orderProcessedFromKafkaTwoListener-in-0:
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
          configuration:
            security:
              protocol: SASL_PLAINTEXT
            sasl:
              mechanism: PLAIN
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedFromKafkaOneListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedFromKafkaTwoListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"

任何时候最好的解决方案都是使用反应函数。

© www.soinside.com 2019 - 2024. All rights reserved.