启动 docker 容器 (cp-kafka-connect-base:7.0.1) 并安装自我管理连接器 (debezium-connector-mysql:latest)。 Docker 容器运行良好。 当我尝试如下配置连接器配置时
{
"name": "MySqlConnectorConnector_0",
"config": {
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule",
"database.history.kafka.topic": "pageviews",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm": "https",
"schema.history.internal.kafka.topic": "PLAIN",
"database.whitelist": "database-test",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule",
"database.history.producer.ssl.endpoint.identification.algorithm": "https",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.server.name": "database-test:3306",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"database.history.consumer.sasl.mechanism": "PLAIN",
"name": "MySqlConnectorConnector_0",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topic.prefix": "test2",
"database.hostname": "database-test",
"database.port": "3306",
"database.user": "admin",
"database.password": "**********",
"database.server.id": "11",
"database.ssl.mode": "disabled",
"connect.keep.alive": "true",
"include.schema.changes": "true",
"inconsistent.schema.handling.mode": "skip"
}
}
连接器创建但连接任务失败
连接器日志
[2023-01-24 17:25:44,827] INFO [Consumer clientId=test3-schemahistory, groupId=test3-schemahistory] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-01-24 17:25:44,827] WARN [Consumer clientId=test3-schemahistory, groupId=test3-schemahistory] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-01-24 17:25:44,827] WARN [Consumer clientId=test3-schemahistory, groupId=test3-schemahistory] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2023-01-24 17:25:44,831] INFO WorkerSourceTask{id=MySqlConnectorConnector_01-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2023-01-24 17:25:44,831] WARN Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask)
[2023-01-24 17:25:44,873] INFO [Consumer clientId=test3-schemahistory, groupId=test3-schemahistory] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2023-01-24 17:25:44,873] INFO [Consumer clientId=test3-schemahistory, groupId=test3-schemahistory] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2023-01-24 17:25:44,874] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2023-01-24 17:25:44,874] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2023-01-24 17:25:44,874] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2023-01-24 17:25:44,875] INFO App info kafka.consumer for test3-schemahistory unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2023-01-24 17:25:44,875] ERROR WorkerSourceTask{id=MySqlConnectorConnector_01-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2023-01-24 17:33:57,334] INFO [Consumer clientId=test3-schemahistory, groupId=test3-schemahistory] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2023-01-24 17:33:57,334] WARN [Consumer clientId=test3-schemahistory, groupId=test3-schemahistory] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
尝试添加一些额外的配置参数,但文档对我来说不清楚。无法弄清楚如何解决这个问题以及连接器配置应该是什么样子。
正如错误所述,
schema.history.internal.kafka.bootstrap.servers
需要是Kafka代理的host:port
列表,而不是Java类名。
同样,
schema.history.internal.kafka.topic
应该是纯字符串,不一定是Java类。
你解决这个问题了吗?如果是,请让我知道您的解决方案。我也遇到同样的错误。