我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server:9092 --topic sample_topic --time -1
但是,现在我们只打开了 SSL 端口,所以我尝试将 SSL 详细信息作为属性文件传递
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server:9093 --topic sample_topic --time -1 --consumer-config /path/to/file
出现以下错误 -
Exception in thread "main" joptsimple.UnrecognizedOptionException: consumer-config is not a recognized option
如何将 SSL 详细信息传递给此命令?这些是 kafka-run-class.sh kafka.tools.GetOffsetShell 的所有可用参数
--broker-list <String: hostname:and port,...,hostname:port>
--max-wait-ms <Integer: ms>
--offsets <Integer: count>
--partitions <String: partition ids>
--time <Long: timestamp/-1(latest)/-2
--topic <String: topic>
不幸的是
kafka.tools.GetOffsetShell
仅支持纯文本连接。这个工具使用得不多,也没有人费心更新它。
根据您的使用案例,您有以下几种选择:
使用
kafka-consumer-groups.sh
工具:假设您有一个消费者组从该主题消费,该工具会显示每个分区的日志结束偏移量补丁
kafka.tools.GetOffsetShell
:通过重用其他工具的逻辑来添加对安全连接的支持非常容易。如果您这样做,请考虑向 Kafka 发送补丁 =)Consumer.endOffsets()
kafka.tools.DumpLogSegments
:作为最后的手段,此工具也可用于查找最后的偏移量我通过运行以下命令成功在 docker 镜像上检索到它:
quay.io/strimzi/kafka:0.29.0-kafka-3.2.0
其中
./bin/kafka-get-offsets.sh --bootstrap-server "$BOOTSTRAPSERVER" --command-config kafkaConfig.properties --topic "$TOPIC" --time -1
在我的例子中包含
kafkaConfig.properties
属性:SASL/SSL
从
kafka-3.x开始security.protocol=SSL
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.type=PEM
ssl.truststore.location=/var/kafka-server-ca-cert.pem
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-user" password="******";
可以与SASL一起使用。 SASL 配置通过
GetOffsetShell
选项传递。查看github上的源代码:GetOffsetShell.scala