无法通过MirrorMaker2将旧数据从zookeeper集群迁移到KRaft集群

问题描述 投票:0回答:1

我正在尝试使用 KRaft 将 Zookeeper 集群(一个 kafka 代理)的卷传输到我们新的 kafka 集群。为此,我尝试设置 kafka 连接并使用镜像制作器。我成功地开始连接并在集群之间创建连接器,我还看到主题已在新集群中创建,但它们的旧数据不存在。

我从here找到了kafka-connect compose。我不明白为什么这个连接需要引导服务器,无论如何我都将它连接到我的新集群。这是我的连接配置;

version: '3'

x-connect-image: &connect-image cricketeerone/apache-kafka-connect:3.8.0

x-connect: &connect-vars
  CONNECT_BOOTSTRAP_SERVERS: "192.168.18.2:29093, 192.168.18.2:29094, 192.168.18.2:29095"

  CONNECT_GROUP_ID: cg_connect-jib
  CONNECT_CONFIG_STORAGE_TOPIC: connect-jib_config
  CONNECT_OFFSET_STORAGE_TOPIC: connect-jib_offsets
  CONNECT_STATUS_STORAGE_TOPIC: connect-jib_status
  # Cannot be higher than the number of brokers in the Kafka cluster
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
  # Defaults for all connectors
  CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
  CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
  # Where Jib places classes
  CONNECT_PLUGIN_PATH: /app/libs
  
  # Security Mechanism for SASL_PLAINTEXT
  CONNECT_SECURITY_PROTOCOL: SASL_PLAINTEXT
  CONNECT_SASL_MECHANISM: PLAIN
  CONNECT_SASL_JAAS_CONFIG: >
    org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="admin-secret";

  # Additional debug options (optional for troubleshooting)
  CONNECT_OPTS: "-Djava.security.debug=gssloginconfig,configfile,configparser,logincontext"

  # Connect client overrides
  CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: 30000
  CONNECT_OFFSET_FLUSH_INTERVAL_MS: 900000
  # Connect consumer overrides
  CONNECT_CONSUMER_MAX_POLL_RECORDS: 500

services:

  # Jib app
  connect-jib-1:
    image: *connect-image
    hostname: connect-jib-1
    ports:
      - '7083:8083'
    environment:
      <<: *connect-vars
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-1

volumes:
  kafka_data:
    driver: local

我不会分享我的 KRaft 集群,因为我确信它的配置不是这里的问题。这是我正在使用的连接器配置;

{
    "name": "zk",
    "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "zk",
    "target.cluster.alias": "kraft",
    "source.cluster.bootstrap.servers": "192.168.2.18:29092",
    "target.cluster.bootstrap.servers":"192.168.18.2:29093,192.168.18.2:29094,192.168.18.2:29095",
    "target.cluster.security.protocol":"SASL_PLAINTEXT",
    "target.cluster.sasl.mechanism":"PLAIN",
    "cluster1->cluster2.enabled":true,
    "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';",
    "key.converter.class":"org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter.class":"org.apache.kafka.connect.converters.ByteArrayConverter",
    "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
    "topics": ".*",
    "refresh.topics.enabled": true,
    "refresh.groups.enabled": true,
    "emit.checkpoints.enabled": true,
    "sync.group.offsets.enabled": true
    }
}

在创建连接器之前,我可以看到名为

mm2_connect
的容器能够成功连接到我的集群。创建连接器后,主题名称将被复制,然后这些消息将在
mm2_connect
;

中被发送为垃圾邮件
[2024-12-18 12:27:45,917] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 919 due to node -3 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:45,917] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29095 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
[2024-12-18 12:27:47,113] INFO [Producer clientId=connector-producer-zk-0] Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:1017)
[2024-12-18 12:27:47,113] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 921 due to node -2 being disconnected (elapsed time since creation: 193ms, elapsed time since send: 193ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:47,113] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29094 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)
[2024-12-18 12:27:48,415] INFO [Producer clientId=connector-producer-zk-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1017)
[2024-12-18 12:27:48,415] INFO [Producer clientId=connector-producer-zk-0] Cancelled in-flight METADATA request with correlation id 923 due to node -1 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, throttle time: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:363)
[2024-12-18 12:27:48,415] WARN [Producer clientId=connector-producer-zk-0] Bootstrap broker 192.168.18.2:29093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1173)

还在代理日志中,我一遍又一遍地看到这些日志;

[2024-12-18 12:29:00,977] INFO [SocketServer listenerType=BROKER, nodeId=11] Failed authentication with /192.168.32.1 (channelId=192.168.32.2:29093-192.168.32.1:35880-133) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
[2024-12-18 12:29:04,883] INFO [SocketServer listenerType=BROKER, nodeId=11] Failed authentication with /192.168.32.1 (channelId=192.168.32.2:29093-192.168.32.1:40748-134) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

我是否误解了MirrorMaker2的目的?我想要的只是将现有数据迁移到新集群。我也知道这个文档存在,我不确定如何将其应用到docker,也许它很简单,但目前我无法理解它。另外,这个解决方案需要我重新启动集群几次,我正在努力避免这种情况。这就是我努力让 MM2 发挥作用的原因。非常感谢任何帮助。

apache-kafka apache-kafka-mirrormaker
1个回答
0
投票

根据所选答案此处,我需要为 Kafka Connect 运行的连接器的生产者配置安全性。所以我将这些行添加到 kafka-connect 设置中;

  CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_PLAINTEXT
  CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
  CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret";

现在数据正在从旧的 Kafka 设置传输到新的设置。

© www.soinside.com 2019 - 2024. All rights reserved.