我有一个单节点的 kafka 集群,启用了 acl SASL_PLAINTEXT。下面分别是我的 kakfa-connect 和 kafka 集群的配置
连接分布式.properties
bootstrap.servers=<ip>:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=False
value.converter.schemas.enable=False
offset.storage.topic=connect-offsets
offset.storage.replication.factor= 1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=100
offset.storage.partitions=8
config.storage.partitions=1
status.storage.partitions=2
rest.advertised.host.name=<ip>
plugin.path=/opt/kafka/connectors
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="password";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="username" \
password="password"
服务器属性
broker.id=14148
host.name=<ip>
advertised.listeners=SASL_PLAINTEXT://<ip>:9092
listeners=SASL_PLAINTEXT://<ip>:9092
num.network.threads=3
num.io.threads=8
num.partitions=12
default.replication.factor=1
offsets.topic.replication.factor=1
unclean.leader.election.enable=false
auto.create.topics.enable=true
log.dirs=/data/data-kafka/
log.dir=/data/data-kafka/
reserved.broker.max.id=500000
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=120
log.retention.bytes=104857600
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=true
zookeeper.connect=<ip>:2181
zookeeper.connection.timeout.ms=10000
delete.topic.enable=true
queued.max.requests=1000
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin;
allow.everyone.if.no.acl.found=false
下面是我要提交连接器的连接器配置
{
"name": "connector4",
"config": {
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_PLAINTEXT",
"producer.sasl.mechanism": "PLAIN",
"producer.security.protocol": "SASL_PLAINTEXT",
"producer.request.timeout.ms": 50000,
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
"producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "none",
"include.schema.changes": "false",
"schema.history.store.only.captured.tables.ddl": "true",
"tombstones.on.delete": "false",
"topic.prefix": "db4",
"decimal.handling.mode": "double",
"schema.history.internal.kafka.topic": "schema.history.v4",
"database.user": "db-user",
"producer.override.compression.type": "snappy",
"database.server.id": "1005",
"event.deserialization.failure.handling.mode": "warn",
"schema.history.internal.kafka.bootstrap.servers": "<kakfa-ip>:9092",
"database.port": "3307",
"inconsistent.schema.handling.mode": "warn",
"database.hostname": "db-ip",
"database.password": "db-pwd",
"name": "connector4",
"table.include.list": "db1.table_dummy",
"database.include.list": "db1",
"snapshot.mode": "when_needed",
"schema.history.skip.unparseable.ddl": "true"
}
}
连接器已提交,但在检查连接器的状态一段时间后,我可以看到任务失败并显示消息。
进一步查看 kafka 连接日志,我可以看到下面的错误消息..
Cancelled in-flight metadata request with correlation id. Request timed out.
所有配置均按照 kafka connect confluence 文档进行,但我仍然无法使其工作。请帮忙。
在浏览了 debezium 和 kafka connect 的源代码后,我成功了。 提交连接器时需要在连接器配置 json 中添加以下属性
"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pwd\";",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pwd\";",
理想情况下,这应该记录在某处。快乐编码!将向 Debezium 人员提交相同内容。