如何使用 connect 平台运行 Debezium?

问题描述 投票:0回答:1

我有一台带有 mysql DB(未容器化)的 EC2 机器。现在我想将 CDC 事件流式传输到 Confluence 管理的 Kafka。我正在使用这个 docker compose 文件在同一主机上启动连接平台。

services:
  connect:
    image: confluentinc/cp-kafka-connect:latest
    network_mode: host
    env_file:
      - .env
    ports:
      - "8083:8083"
    command:
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.2
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run & 
        #
        sleep infinity
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "${KAFKA_BOOTSTRAP_SERVER}"
      CONNECT_TOPIC_CREATION_ENABLE: "false"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_MECHANISM: "PLAIN"
      CONNECT_SASL_USERNAME: "${KAFKA_API_KEY}"
      CONNECT_SASL_PASSWORD: "${KAFKA_API_SECRET}"
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
      CONNECT_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_RETRY_BACKOFF_MS: "500"

      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
      CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
      CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"

      CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
      CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
      CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
      CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"

      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_CONFIG_STORAGE_TOPIC: "${CONFIG_STORAGE_TOPIC}"
      CONNECT_OFFSET_STORAGE_TOPIC: "${OFFSET_STORAGE_TOPIC}"
      CONNECT_STATUS_STORAGE_TOPIC: "${STATUS_STORAGE_TOPIC}"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
      CONNECT_CONFIG_PROVIDERS: 'file'
      CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
    volumes:
      - .env:/data/credentials.properties

现在这部分似乎起作用了。我在这里没有收到任何错误。运行此命令后,我为配置、偏移量和状态创建的元主题就会被填充。

现在我正在使用 API 调用和此配置创建 CDC 连接器:

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "debezium",
    "database.server.id": 1,
    "database.include.list": "mydb",
    "snapshot.mode": "schema_only",
    "topic.prefix": "analytics01",
    "table.include.list": "mydb.table1,mydb.table2",
    "schema.history.internal.kafka.bootstrap.servers": "${file:/data/credentials.properties:KAFKA_BOOTSTRAP_SERVER}",
    "schema.history.internal.kafka.topic": "${file:/data/credentials.properties:SERVER_NAME}.schemaChanges",
    "schema.history.consumer.security.protocol": "SASL_SSL",
    "schema.history.consumer.ssl.endpoint.identification.algorithm": "https",
    "schema.history.consumer.sasl.mechanism": "PLAIN",
    "schema.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:KAFKA_API_KEY}\" password=\"${file:/data/credentials.properties:KAFKA_API_SECRET}\";",
    "schema.history.producer.security.protocol": "SASL_SSL",
    "schema.history.producer.ssl.endpoint.identification.algorithm": "https",
    "schema.history.producer.sasl.mechanism": "PLAIN",
    "schema.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:KAFKA_API_KEY}\" password=\"${file:/data/credentials.properties:KAFKA_API_SECRET}\";",
    "decimal.handling.mode":"double",
    "transforms": "InsertField",
    "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertField.static.field": "licenseId",
    "transforms.InsertField.static.value": "${file:/data/credentials.properties:LICENSE_ID}"
  }
}

CDC 连接器出现故障:

{"name":"mysql-connector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata\n"}],"type":"source"}

我尝试在调试模式下运行所有内容并搜索一些可用的输出,但找不到任何输出。你们知道我做错了什么吗?

我尝试了不同的配置并查看了类似的指南

https://docs.confluence.io/cloud/current/cp-component/connect-cloud-config.html

https://rmoff.net/2019/10/16/using-kafka-connect-and-debezium-with-confluence-cloud/

我还尝试对凭据进行硬编码,以防凭据或环境导入出现问题,但事实并非如此。


我忘记强调我正在使用 .env 文件作为 docker compose 脚本来注入凭据。该文件位于 docker-compose.yml 旁边,包含以下几行

KAFKA_API_KEY=xxx
KAFKA_API_SECRET=xxx
KAFKA_BOOTSTRAP_SERVER=xxx.eu-central-1.aws.confluent.cloud:9092
LICENSE_ID=xxx
CONFIG_STORAGE_TOPIC=xxx-config
OFFSET_STORAGE_TOPIC=xxx-offset
STATUS_STORAGE_TOPIC=xxx-status
SERVER_NAME=analytics01

同样,我将相同的文件安装到连接容器中,并使用例如连接器配置中显示的凭据引用

${file:/data/credentials.properties:KAFKA_API_KEY}

mysql apache-kafka docker-compose apache-kafka-connect debezium
1个回答
0
投票

获取主题元数据时超时

要么

  1. 您的引导服务器错误(尝试不使用 env 文件,并且不需要将其安装为卷)
  2. 您的 API 密钥不正确
  3. 有一些防火墙/代理拒绝请求

此外,

localhost
指的是 Connect 容器本身,而不是您的数据库。如果数据库不在容器中,请参阅 https://docs.docker.com/compose/how-tos/networking/(提示:删除主机网络模式并使用
docker.host.internal
)。

© www.soinside.com 2019 - 2024. All rights reserved.