我正在尝试使用 KRaft 将 Zookeeper 集群(一个 kafka 代理)的卷传输到我们新的 kafka 集群。为此,我尝试设置 kafka 连接并使用镜像制作器。我成功地开始连接并在集群之间创建连接器,我还看到主题已在新集群中创建,但它们的旧数据不存在。
我从here找到了kafka-connect compose。我不明白为什么这个连接需要引导服务器,无论如何我都将它连接到我的新集群。这是我的连接配置;
version: '3'
x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.8.0
x-connect: &connect-vars
CONNECT_BOOTSTRAP_SERVERS: "192.168.18.2:29093, 192.168.18.2:29094, 192.168.18.2:29095"
CONNECT_GROUP_ID: cg_connect-jib
CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config
CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status
# Cannot be higher than the number of brokers in the Kafka cluster
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
# Defaults for all connectors
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
# Where Jib places classes
CONNECT_PLUGIN_PATH: /app/libs
# Security Mechanism for SASL_PLAINTEXT
CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SASL_JAAS_CONFIG: >
org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret";
# Additional debug options (optional for troubleshooting)
CONNECT_OPTS: "-Djava.security.debug=gssloginconfig,configfile,configparser,logincontext"
# Connect client overrides
CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000
# Connect consumer overrides
CONNECT_CONSUMER_MAX_POLL_RECORDS: 500
services:
# Jib app
connect-jib-1:
image: *connect-image
hostname: connect-jib-1
ports:
- '7083:8083'
environment:
<<: *connect-vars
CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1
volumes:
kafka_data:
driver: local
我不会分享我的 KRaft 集群,因为我确信它的配置不是这里的问题。这是我正在使用的连接器配置;
{
"name": "zk",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "zk",
"target.cluster.alias": "kraft",
"source.cluster.bootstrap.servers": "192.168.2.18:29092",
"target.cluster.bootstrap.servers":"192.168.18.2:29093,192.168.18.2:29094,192.168.18.2:29095",
"target.cluster.security.protocol":"SASL_PLAINTEXT",
"target.cluster.sasl.mechanism":"PLAIN",
"cluster1->cluster2.enabled":true,
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';",
"key.converter.class":"org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter.class":"org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"topics": ".*",
"refresh.topics.enabled": true,
"refresh.groups.enabled": true,
"emit.checkpoints.enabled": true,
"sync.group.offsets.enabled": true
}
}
在创建连接器之前,我可以看到名为
mm2_connect
的容器能够成功连接到我的集群。创建连接器后,主题名称将被复制,然后这些消息将在 mm2_connect
; 中被发送为垃圾邮件
[2024-12-18 12:27:45,917] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 919 due to node -3 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:45,917] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29095 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
[2024-12-18 12:27:47,113] INFO [Producer clientId=connector-producer-zk-0] Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:1017)
[2024-12-18 12:27:47,113] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 921 due to node -2 being disconnected (elapsed time since creation: 193ms, elapsed time since send: 193ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:47,113] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29094 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
[2024-12-18 12:27:48,415] INFO [Producer clientId=connector-producer-zk-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1017)
[2024-12-18 12:27:48,415] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 923 due to node -1 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:48,415] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
还在代理日志中,我一遍又一遍地看到这些日志;
[2024-12-18 12:29:00,977] INFO [SocketServer listenerType=BROKER, nodeId=11] Failed authentication with /192.168.32.1 (channelId=192.168.32.2:29093-192.168.32.1:35880-133) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2024-12-18 12:29:04,883] INFO [SocketServer listenerType=BROKER, nodeId=11] Failed authentication with /192.168.32.1 (channelId=192.168.32.2:29093-192.168.32.1:40748-134) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
我是否误解了MirrorMaker2的目的?我想要的只是将现有数据迁移到新集群。我也知道这个文档存在,我不确定如何将其应用到docker,也许它很简单,但目前我无法理解它。另外,这个解决方案需要我重新启动集群几次,我正在努力避免这种情况。这就是我努力让 MM2 发挥作用的原因。非常感谢任何帮助。
根据所选答案此处,我需要为 Kafka Connect 运行的连接器的生产者配置安全性。所以我将这些行添加到 kafka-connect 设置中;
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXT
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret";
现在数据正在从旧的 Kafka 设置传输到新的设置。