我正在尝试使用 Mirror Maker 将主题从一个 Kafka 集群复制到另一个集群。
我正在使用 docker compose 创建以下资源:
networks:
local_kafka:
name: local_kafka
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.2
container_name: zookeeper
networks:
- local_kafka
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*" # For easier local debugging of zookeeper issues.
zookeeper2:
image: confluentinc/cp-zookeeper:7.5.2
container_name: zookeeper2
networks:
- local_kafka
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*" # For easier local debugging of zookeeper issues.
broker1:
image: confluentinc/cp-kafka:7.5.2
container_name: broker1
networks:
- local_kafka
ports:
- "19092:19092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_BOOTSTRAP_SERVERS: broker1:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker1:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_MIN_INSYNC_REPLICAS: 1
ALLOW_PLAINTEXT_LISTENERS: "true"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_RETRIES: 3
broker2:
image: confluentinc/cp-kafka:7.5.2
container_name: broker2
networks:
- local_kafka
ports:
- "19093:19093"
depends_on:
- zookeeper2
environment:
KAFKA_BROKER_ID: 2
KAFKA_BOOTSTRAP_SERVERS: broker2:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper2:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:9092,CONNECTIONS_FROM_HOST://localhost:19093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_MIN_INSYNC_REPLICAS: 1
KAFKA_SCHEMA_REGISTRY_URL: schemaregistry:8081
ALLOW_PLAINTEXT_LISTENERS: "true"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_RETRIES: 3
schemaregistry:
image: confluentinc/cp-schema-registry:7.5.2
container_name: schemaregistry
networks:
- local_kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker2:9092
SCHEMA_REGISTRY_DEBUG: "true"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _registry-schemas
ports:
- "8081:8081"
kafka-connect:
image: confluentinc/cp-kafka-connect-base:7.6.1
container_name: kafka-connect
networks:
- local_kafka
depends_on:
- broker1
- broker2
- schemaregistry
- zookeeper
- zookeeper2
environment:
CONNECT_BOOTSTRAP_SERVERS: PLAINTEXT://broker2:9092
CONNECT_SCHEMA_REGISTRY_URL: http://schemaregistry:8081
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.protobuf.ProtobufConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8081
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "2"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "2"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "2"
CONNECT_PLUGIN_PATH: /usr/local/share/kafka/plugins,/usr/share/filestream-connectors
KAFKA_LOG4J_OPTS: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
volumes:
- ./log4j.properties:/etc/kafka/log4j.properties
command:
- bash
- -c
- |
echo "Installing Protobuf Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-protobuf-converter:7.3.3
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
ports:
- "8083:8083"
我之后使用以下 Kafka Connect 属性:
{
"name": "local-to-remote-mirror",
"config":{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"tasks.max": "2",
"topics": ".*",
"replication.policy.separator": "",
"source.cluster.alias": "",
"target.cluster.alias": "",
"source.cluster.bootstrap.servers": "http://broker1:9092",
"target.cluster.bootstrap.servers": "http://broker2:9092",
"offset-syncs.topic.location": "target",
"offset-syncs.topic.replication.factor": "2",
"sync.topic.acls.enabled": false,
"sync.topic.configs.enabled": false,
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"_comment": "Kafka Connect converter used to deserialize keys",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"_comment": "Kafka Connect converter used to deserialize values",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
虽然,当我发布到broker1中的主题时,broker2中没有任何反应。我通过 Kafka Connect UI 注意到没有任务正在运行:
是否缺少某些东西才能使此复制工作正常?
提前谢谢您。
我能够通过以下配置解决它:
{
"name": "local-to-remote-mirror",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"tasks.max": "2",
"topics": "^feedzai-data-.*",
"replication.policy.separator": "",
"source.cluster.alias": "",
"source.cluster.bootstrap.servers": "http://broker1:9092",
"target.cluster.bootstrap.servers": "http://broker2:9092",
"offset-syncs.topic.replication.factor": "1",
"sync.topic.acls.enabled": "false",
"sync.topic.configs.enabled": "false",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"replication.factor": "1",
"errors.log.enable": true,
"_comment": "Kafka Connect converter used to deserialize keys",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"_comment": "Kafka Connect converter used to deserialize values",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"offset-syncs.topic.location": "target"
}
}
基本上删除了
target.cluster.alias
并添加了 "replication.factor": "1"
和 "offset-syncs.topic.replication.factor": "1"
。由于每个集群上只有一个代理,因此无法满足复制因子。