如何在Kafka-Connect(群集内)中启用SASL

问题描述 投票:0回答:1

我已经下载了cp-kafka-connect,并通过接受安全连接的KafKa代理部署在我的k8s集群中。 (SASL)

我想为Kafka Connect启用安全性(SASL)。

我正在使用ConfigMap将名为connect-distributed.properties的配置文件装入cp-kafka-connect容器(在etc / kafka中)

这是配置文件的一部分:

sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required 
    username="admin" password="password-secret";

但是它不能从错误开始。

以下是日志:

kubectl logs test-cp-kafka-connect-846f4b745f-hx2mp
===> ENV Variables ...
ALLOW_UNSIGNED=false
COMPONENT=kafka-connect
CONFLUENT_DEB_VERSION=1
CONFLUENT_PLATFORM_LABEL=
CONFLUENT_VERSION=5.5.0
CONNECT_BOOTSTRAP_SERVERS=PLAINTEXT://test-kafka:9092
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
CONNECT_CONFIG_STORAGE_TOPIC=test-cp-kafka-connect-config
CONNECT_GROUP_ID=test
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://test-cp-schema-registry:8081
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
CONNECT_OFFSET_STORAGE_TOPIC=test-cp-kafka-connect-offset
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components
CONNECT_REST_ADVERTISED_HOST_NAME=10.233.85.127
CONNECT_REST_PORT=8083
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
CONNECT_STATUS_STORAGE_TOPIC=test-cp-kafka-connect-status
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://test-cp-schema-registry:8081
CUB_CLASSPATH=/etc/confluent/docker/docker-utils.jar
HOME=/root
HOSTNAME=test-cp-kafka-connect-846f4b745f-hx2mp
KAFKA_ADVERTISED_LISTENERS=
KAFKA_HEAP_OPTS=-Xms512M -Xmx512M
KAFKA_JMX_PORT=5555
KAFKA_VERSION=
KAFKA_ZOOKEEPER_CONNECT=
KUBERNETES_PORT=tcp://10.233.0.1:443
KUBERNETES_PORT_443_TCP=tcp://10.233.0.1:443
KUBERNETES_PORT_443_TCP_ADDR=10.233.0.1
KUBERNETES_PORT_443_TCP_PORT=443
KUBERNETES_PORT_443_TCP_PROTO=tcp
KUBERNETES_SERVICE_HOST=10.233.0.1
KUBERNETES_SERVICE_PORT=443
KUBERNETES_SERVICE_PORT_HTTPS=443
LANG=C.UTF-8
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
PWD=/
PYTHON_PIP_VERSION=8.1.2
PYTHON_VERSION=2.7.9-1
SCALA_VERSION=2.12
SHLVL=1
TEST_0_EXTERNAL_PORT=tcp://10.233.13.164:19092
TEST_0_EXTERNAL_PORT_19092_TCP=tcp://10.233.13.164:19092
TEST_0_EXTERNAL_PORT_19092_TCP_ADDR=10.233.13.164
TEST_0_EXTERNAL_PORT_19092_TCP_PORT=19092
TEST_0_EXTERNAL_PORT_19092_TCP_PROTO=tcp
TEST_0_EXTERNAL_SERVICE_HOST=10.233.13.164
TEST_0_EXTERNAL_SERVICE_PORT=19092
TEST_0_EXTERNAL_SERVICE_PORT_EXTERNAL_BROKER=19092
TEST_CP_KAFKA_CONNECT_PORT=tcp://10.233.38.137:8083
TEST_CP_KAFKA_CONNECT_PORT_8083_TCP=tcp://10.233.38.137:8083
TEST_CP_KAFKA_CONNECT_PORT_8083_TCP_ADDR=10.233.38.137
TEST_CP_KAFKA_CONNECT_PORT_8083_TCP_PORT=8083
TEST_CP_KAFKA_CONNECT_PORT_8083_TCP_PROTO=tcp
TEST_CP_KAFKA_CONNECT_SERVICE_HOST=10.233.38.137
TEST_CP_KAFKA_CONNECT_SERVICE_PORT=8083
TEST_CP_KAFKA_CONNECT_SERVICE_PORT_KAFKA_CONNECT=8083
TEST_KAFKA_EXPORTER_PORT=tcp://10.233.5.215:9308
TEST_KAFKA_EXPORTER_PORT_9308_TCP=tcp://10.233.5.215:9308
TEST_KAFKA_EXPORTER_PORT_9308_TCP_ADDR=10.233.5.215
TEST_KAFKA_EXPORTER_PORT_9308_TCP_PORT=9308
TEST_KAFKA_EXPORTER_PORT_9308_TCP_PROTO=tcp
TEST_KAFKA_EXPORTER_SERVICE_HOST=10.233.5.215
TEST_KAFKA_EXPORTER_SERVICE_PORT=9308
TEST_KAFKA_EXPORTER_SERVICE_PORT_KAFKA_EXPORTER=9308
TEST_KAFKA_MANAGER_PORT=tcp://10.233.7.186:9000
TEST_KAFKA_MANAGER_PORT_9000_TCP=tcp://10.233.7.186:9000
TEST_KAFKA_MANAGER_PORT_9000_TCP_ADDR=10.233.7.186
TEST_KAFKA_MANAGER_PORT_9000_TCP_PORT=9000
TEST_KAFKA_MANAGER_PORT_9000_TCP_PROTO=tcp
TEST_KAFKA_MANAGER_SERVICE_HOST=10.233.7.186
TEST_KAFKA_MANAGER_SERVICE_PORT=9000
TEST_KAFKA_MANAGER_SERVICE_PORT_KAFKA_MANAGER=9000
TEST_KAFKA_PORT=tcp://10.233.12.237:9092
TEST_KAFKA_PORT_8001_TCP=tcp://10.233.12.237:8001
TEST_KAFKA_PORT_8001_TCP_ADDR=10.233.12.237
TEST_KAFKA_PORT_8001_TCP_PORT=8001
TEST_KAFKA_PORT_8001_TCP_PROTO=tcp
TEST_KAFKA_PORT_9092_TCP=tcp://10.233.12.237:9092
TEST_KAFKA_PORT_9092_TCP_ADDR=10.233.12.237
TEST_KAFKA_PORT_9092_TCP_PORT=9092
TEST_KAFKA_PORT_9092_TCP_PROTO=tcp
TEST_KAFKA_SERVICE_HOST=10.233.12.237
TEST_KAFKA_SERVICE_PORT=9092
TEST_KAFKA_SERVICE_PORT_BROKER=9092
TEST_KAFKA_SERVICE_PORT_KAFKASHELL=8001
TEST_ZOOKEEPER_PORT=tcp://10.233.1.144:2181
TEST_ZOOKEEPER_PORT_2181_TCP=tcp://10.233.1.144:2181
TEST_ZOOKEEPER_PORT_2181_TCP_ADDR=10.233.1.144
TEST_ZOOKEEPER_PORT_2181_TCP_PORT=2181
TEST_ZOOKEEPER_PORT_2181_TCP_PROTO=tcp
TEST_ZOOKEEPER_SERVICE_HOST=10.233.1.144
TEST_ZOOKEEPER_SERVICE_PORT=2181
TEST_ZOOKEEPER_SERVICE_PORT_CLIENT=2181
ZULU_OPENJDK_VERSION=8=8.38.0.13
_=/usr/bin/env
appID=dAi5R82Pf9xC38kHkGeAFaOknIUImdmS-1589882527
cluster=test
datacenter=testx
namespace=mynamespace
workspace=8334431b-ef82-414f-9348-a8de032dfca7
===> User
uid=0(root) gid=0(root) groups=0(root)
===> Configuring ...
===> Running preflight checks ...
===> Check if Kafka is healthy ...
[main] INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
        bootstrap.servers = [PLAINTEXT://test-kafka:9092]
        client.dns.lookup = default
        client.id =
        connections.max.idle.ms = 300000
        default.api.timeout.ms = 60000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 5.5.0-ccs
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 785a156634af5f7e
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1589883940496
[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.clients.admin.internals.AdminMetadataManager - [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1589883970509) timed out at 1589883970510 after 281 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

错误是:

[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.clients.admin.internals.AdminMetadataManager - [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1589883970509) timed out at 1589883970510 after 281 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 

请参阅此方法:sasl-scram-connect-workers

有人可以帮我解决这个问题吗?

apache-kafka kubernetes-helm apache-kafka-connect
1个回答
0
投票

更改boostrapServers参数以指向SASL listerner。例如:

SASL_SSL://test-kafka:9093
© www.soinside.com 2019 - 2024. All rights reserved.