将数据复制到另一个 Kafka 实例的最小 MirrorSourceConnector 配置

问题描述 投票:0回答:1

我正在尝试使用 Mirror Maker 将主题从一个 Kafka 集群复制到另一个集群。

我正在使用 docker compose 创建以下资源:

networks:
  local_kafka:
    name: local_kafka

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.2
    container_name: zookeeper
    networks:
      - local_kafka
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*" # For easier local debugging of zookeeper issues.

  zookeeper2:
    image: confluentinc/cp-zookeeper:7.5.2
    container_name: zookeeper2
    networks:
      - local_kafka
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*" # For easier local debugging of zookeeper issues.

  broker1:
    image: confluentinc/cp-kafka:7.5.2
    container_name: broker1
    networks:
      - local_kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_BOOTSTRAP_SERVERS: broker1:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker1:9092,CONNECTIONS_FROM_HOST://localhost:19092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_MIN_INSYNC_REPLICAS: 1
      ALLOW_PLAINTEXT_LISTENERS: "true"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_RETRIES: 3

  broker2:
    image: confluentinc/cp-kafka:7.5.2
    container_name: broker2
    networks:
      - local_kafka
    ports:
      - "19093:19093"
    depends_on:
      - zookeeper2
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_BOOTSTRAP_SERVERS: broker2:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper2:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:9092,CONNECTIONS_FROM_HOST://localhost:19093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_MIN_INSYNC_REPLICAS: 1
      KAFKA_SCHEMA_REGISTRY_URL: schemaregistry:8081
      ALLOW_PLAINTEXT_LISTENERS: "true"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_RETRIES: 3

  schemaregistry:
    image: confluentinc/cp-schema-registry:7.5.2
    container_name: schemaregistry
    networks:
      - local_kafka
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker2:9092
      SCHEMA_REGISTRY_DEBUG: "true"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _registry-schemas
    ports:
      - "8081:8081"

  kafka-connect:
      image: confluentinc/cp-kafka-connect-base:7.6.1
      container_name: kafka-connect
      networks:
        - local_kafka
      depends_on:
        - broker1
        - broker2
        - schemaregistry
        - zookeeper
        - zookeeper2
      environment:
        CONNECT_BOOTSTRAP_SERVERS: PLAINTEXT://broker2:9092
        CONNECT_SCHEMA_REGISTRY_URL: http://schemaregistry:8081
        CONNECT_REST_PORT: 8083
        CONNECT_GROUP_ID: kafka-connect
        CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
        CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
        CONNECT_STATUS_STORAGE_TOPIC: _connect-status
        CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
        CONNECT_VALUE_CONVERTER: io.confluent.connect.protobuf.ProtobufConverter
        CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8081
        CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
        CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "2"
        CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "2"
        CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "2"
        CONNECT_PLUGIN_PATH: /usr/local/share/kafka/plugins,/usr/share/filestream-connectors
        KAFKA_LOG4J_OPTS: -Dlog4j.configuration=file:/etc/kafka/log4j.properties

      volumes:
        - ./log4j.properties:/etc/kafka/log4j.properties
      command:
        - bash
        - -c
        - |
          echo "Installing Protobuf Connector"
          confluent-hub install --no-prompt confluentinc/kafka-connect-protobuf-converter:7.3.3
          echo "Launching Kafka Connect worker"
          /etc/confluent/docker/run &
          #
          sleep infinity
      ports:
        - "8083:8083"

我之后使用以下 Kafka Connect 属性:

{
    "name": "local-to-remote-mirror",
    "config":{
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "tasks.max": "2",
        "topics": ".*",
        "replication.policy.separator": "",
        "source.cluster.alias": "",
        "target.cluster.alias": "",
        "source.cluster.bootstrap.servers": "http://broker1:9092",
        "target.cluster.bootstrap.servers": "http://broker2:9092",
        "offset-syncs.topic.location": "target",
        "offset-syncs.topic.replication.factor": "2",
        "sync.topic.acls.enabled": false,
        "sync.topic.configs.enabled": false,
        "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",

        "_comment": "Kafka Connect converter used to deserialize keys",
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

        "_comment": "Kafka Connect converter used to deserialize values",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
    }
}

虽然,当我发布到broker1中的主题时,broker2中没有任何反应。我通过 Kafka Connect UI 注意到没有任务正在运行:enter image description here

是否缺少某些东西才能使此复制工作正常?

提前谢谢您。

apache-kafka apache-kafka-connect apache-kafka-mirrormaker
1个回答
0
投票

我能够通过以下配置解决它:

{
    "name": "local-to-remote-mirror",
    "config": {
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "tasks.max": "2",
        "topics": "^feedzai-data-.*",
        "replication.policy.separator": "",
        "source.cluster.alias": "",
        "source.cluster.bootstrap.servers": "http://broker1:9092",
        "target.cluster.bootstrap.servers": "http://broker2:9092",
        "offset-syncs.topic.replication.factor": "1",
        "sync.topic.acls.enabled": "false",
        "sync.topic.configs.enabled": "false",
        "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
        "replication.factor": "1",
        "errors.log.enable": true,
        "_comment": "Kafka Connect converter used to deserialize keys",
        "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "_comment": "Kafka Connect converter used to deserialize values",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "offset-syncs.topic.location": "target"
    }
}

基本上删除了

target.cluster.alias
并添加了
"replication.factor": "1"
"offset-syncs.topic.replication.factor": "1"
。由于每个集群上只有一个代理,因此无法满足复制因子。

© www.soinside.com 2019 - 2024. All rights reserved.