ACL 配置在 kafka connect 中不起作用

问题描述 投票:0回答:1

我有一个单节点的 kafka 集群,启用了 acl SASL_PLAINTEXT。下面分别是我的 kakfa-connect 和 kafka 集群的配置

连接分布式.properties

bootstrap.servers=<ip>:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=False
value.converter.schemas.enable=False

offset.storage.topic=connect-offsets
offset.storage.replication.factor= 1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=100
offset.storage.partitions=8
config.storage.partitions=1
status.storage.partitions=2
rest.advertised.host.name=<ip>
plugin.path=/opt/kafka/connectors
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="username" \
  password="password";

producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="username" \
  password="password"

服务器属性

broker.id=14148
host.name=<ip>
advertised.listeners=SASL_PLAINTEXT://<ip>:9092
listeners=SASL_PLAINTEXT://<ip>:9092
num.network.threads=3
num.io.threads=8
num.partitions=12

default.replication.factor=1
offsets.topic.replication.factor=1

unclean.leader.election.enable=false
auto.create.topics.enable=true
log.dirs=/data/data-kafka/
log.dir=/data/data-kafka/

reserved.broker.max.id=500000

socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600

log.flush.interval.messages=10000
log.flush.interval.ms=1000

log.retention.hours=120
log.retention.bytes=104857600
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=true

zookeeper.connect=<ip>:2181

zookeeper.connection.timeout.ms=10000

delete.topic.enable=true
queued.max.requests=1000

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin;
allow.everyone.if.no.acl.found=false

下面是我要提交连接器的连接器配置

{
    "name": "connector4",
    "config": {
        "sasl.mechanism": "PLAIN",
        "security.protocol": "SASL_PLAINTEXT",
        "producer.sasl.mechanism": "PLAIN",
        "producer.security.protocol": "SASL_PLAINTEXT",
        "producer.request.timeout.ms": 50000,
        "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
        "producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "snapshot.locking.mode": "none",
        "include.schema.changes": "false",
        "schema.history.store.only.captured.tables.ddl": "true",
        "tombstones.on.delete": "false",
        "topic.prefix": "db4",
        "decimal.handling.mode": "double",
        "schema.history.internal.kafka.topic": "schema.history.v4",
         "database.user": "db-user",
        "producer.override.compression.type": "snappy",
        "database.server.id": "1005",
        "event.deserialization.failure.handling.mode": "warn",
        "schema.history.internal.kafka.bootstrap.servers": "<kakfa-ip>:9092",
        "database.port": "3307",
        "inconsistent.schema.handling.mode": "warn",
        "database.hostname": "db-ip",
        "database.password": "db-pwd",
        "name": "connector4",
        "table.include.list": "db1.table_dummy",
        "database.include.list": "db1",
        "snapshot.mode": "when_needed",
        "schema.history.skip.unparseable.ddl": "true"
    }
}

连接器已提交,但在检查连接器的状态一段时间后,我可以看到任务失败并显示消息。

进一步查看 kafka 连接日志,我可以看到下面的错误消息..

Cancelled in-flight metadata request with correlation id. Request timed out. 

所有配置均按照 kafka connect confluence 文档进行,但我仍然无法使其工作。请帮忙。

apache-kafka apache-kafka-connect mysql-connector
1个回答
0
投票

在浏览了 debezium 和 kafka connect 的源代码后,我成功了。 提交连接器时需要在连接器配置 json 中添加以下属性

"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
        "schema.history.internal.producer.sasl.mechanism": "PLAIN",
        "schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pwd\";",
        "schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
        "schema.history.internal.consumer.sasl.mechanism": "PLAIN",
        "schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pwd\";",

理想情况下,这应该记录在某处。快乐编码!将向 Debezium 人员提交相同内容。

© www.soinside.com 2019 - 2024. All rights reserved.