我有一台带有 mysql DB(未容器化)的 EC2 机器。现在我想将 CDC 事件流式传输到 Confluence 管理的 Kafka。我正在使用这个 docker compose 文件在同一主机上启动连接平台。
services:
connect:
image: confluentinc/cp-kafka-connect:latest
network_mode: host
env_file:
- .env
ports:
- "8083:8083"
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.2
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
environment:
CONNECT_BOOTSTRAP_SERVERS: "${KAFKA_BOOTSTRAP_SERVER}"
CONNECT_TOPIC_CREATION_ENABLE: "false"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SASL_USERNAME: "${KAFKA_API_KEY}"
CONNECT_SASL_PASSWORD: "${KAFKA_API_SECRET}"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_API_KEY}\" password=\"${KAFKA_API_SECRET}\";"
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_CONFIG_STORAGE_TOPIC: "${CONFIG_STORAGE_TOPIC}"
CONNECT_OFFSET_STORAGE_TOPIC: "${OFFSET_STORAGE_TOPIC}"
CONNECT_STATUS_STORAGE_TOPIC: "${STATUS_STORAGE_TOPIC}"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_CONFIG_PROVIDERS: 'file'
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
volumes:
- .env:/data/credentials.properties
现在这部分似乎起作用了。我在这里没有收到任何错误。运行此命令后,我为配置、偏移量和状态创建的元主题就会被填充。
现在我正在使用 API 调用和此配置创建 CDC 连接器:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "debezium",
"database.server.id": 1,
"database.include.list": "mydb",
"snapshot.mode": "schema_only",
"topic.prefix": "analytics01",
"table.include.list": "mydb.table1,mydb.table2",
"schema.history.internal.kafka.bootstrap.servers": "${file:/data/credentials.properties:KAFKA_BOOTSTRAP_SERVER}",
"schema.history.internal.kafka.topic": "${file:/data/credentials.properties:SERVER_NAME}.schemaChanges",
"schema.history.consumer.security.protocol": "SASL_SSL",
"schema.history.consumer.ssl.endpoint.identification.algorithm": "https",
"schema.history.consumer.sasl.mechanism": "PLAIN",
"schema.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:KAFKA_API_KEY}\" password=\"${file:/data/credentials.properties:KAFKA_API_SECRET}\";",
"schema.history.producer.security.protocol": "SASL_SSL",
"schema.history.producer.ssl.endpoint.identification.algorithm": "https",
"schema.history.producer.sasl.mechanism": "PLAIN",
"schema.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/data/credentials.properties:KAFKA_API_KEY}\" password=\"${file:/data/credentials.properties:KAFKA_API_SECRET}\";",
"decimal.handling.mode":"double",
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "licenseId",
"transforms.InsertField.static.value": "${file:/data/credentials.properties:LICENSE_ID}"
}
}
CDC 连接器出现故障:
{"name":"mysql-connector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata\n"}],"type":"source"}
我尝试在调试模式下运行所有内容并搜索一些可用的输出,但找不到任何输出。你们知道我做错了什么吗?
我尝试了不同的配置并查看了类似的指南
https://docs.confluence.io/cloud/current/cp-component/connect-cloud-config.html
或
https://rmoff.net/2019/10/16/using-kafka-connect-and-debezium-with-confluence-cloud/
我还尝试对凭据进行硬编码,以防凭据或环境导入出现问题,但事实并非如此。
我忘记强调我正在使用 .env 文件作为 docker compose 脚本来注入凭据。该文件位于 docker-compose.yml 旁边,包含以下几行
KAFKA_API_KEY=xxx
KAFKA_API_SECRET=xxx
KAFKA_BOOTSTRAP_SERVER=xxx.eu-central-1.aws.confluent.cloud:9092
LICENSE_ID=xxx
CONFIG_STORAGE_TOPIC=xxx-config
OFFSET_STORAGE_TOPIC=xxx-offset
STATUS_STORAGE_TOPIC=xxx-status
SERVER_NAME=analytics01
同样,我将相同的文件安装到连接容器中,并使用例如连接器配置中显示的凭据引用
${file:/data/credentials.properties:KAFKA_API_KEY}
获取主题元数据时超时
要么
此外,
localhost
指的是 Connect 容器本身,而不是您的数据库。如果数据库不在容器中,请参阅 https://docs.docker.com/compose/how-tos/networking/(提示:删除主机网络模式并使用 docker.host.internal
)。