我在使用简单的 Kafka 消费者程序时遇到了一些麻烦:
18/06/04 18:13:49 ERROR /log/log.txt: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524)
at com.carrefour.entequadratura.KafkaHandler.createConsumer(KafkaHandler.java:96)
at com.carrefour.entequadratura.KafkaHandler.runConsumer(KafkaHandler.java:104)
at com.carrefour.entequadratura.Main.main(Main.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:577)
... 14 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:295)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44)
at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
... 17 more
这些是我的财产:
BOOTSTRAP_SERVERS=xxxxxxxxxxxxxxxxxx:6667
GROUP_ID=EnteLoader
AUTO_COMMIT=false
AUTO_COMMIT_INTERVAL=10000
SESSION_TIMEOUT=30000
MAX_POLL_RECORDS=5
KEY_DESERIALIZER=org.apache.kafka.common.serialization.StringDeserializer
VALUE_DESERIALIZER=org.apache.kafka.common.serialization.StringDeserializer
SECURITY_PROTOCOL=SASL_PLAINTEXT
SASL_MECHANISM=GSSAPI
SASL_KERBEROS_SERVICE_NAME=kafka
我读到这可能是与 jaas.conf 相关的一个可能的问题,但我是 Kafka 的新手,我不知道如何找到它..
你能帮我一下吗? 谢谢!
您可以通过两种方式将 jaas conf 传递给您的 kafka 消费者。
如果您使用的kafka-client版本大于0.10.2.1,您可以设置一个属性
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";
正如您的错误消息所示,您可以设置系统属性
java.security.auth.login.config
,为此,您需要将 jaas 配置字符串放入文件中,并将该路径作为上述系统属性的值。KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="yourServiceName"
username="userName"
password="password";
};
然后设置环境变量:
System.setProperty("java.security.auth.login.config","/path/to/jaas.conf");
我会推荐第一个选项,因为我在选择第二个选项时遇到了一些问题。
检查您的 application.properties 是否设置为 kafka.security.protocol=SASL_PLAINTEXT
然后需要在您的属性中设置 JAAS 身份验证。 System.setProperty("java.security.auth.login.config","/File_Location/jaas.conf");
如果您正在使用身份验证测试本地。将身份验证协议设置为 Plain_text as
kafka.security.protocol=纯文本
其经过测试的代码。对我来说工作得很好。
我试图从本地连接到远程 Kafka 代理,但遇到了同样的错误。
Kafka 官方文档的说法与上述答案相同https://kafka.apache.org/documentation/#security_jaas_client。但是,我对
sasl.jaas.config
不成功,因此使用了静态。
有几点需要注意:
jaas.conf
内容取决于sasl.mechanism
。sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alice" \
password="alice-secret";
成为
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice-secret";
};
请参阅官方文档以获取正确的 jaas.conf:https://kafka.apache.org/documentation/#security_client_staticjaas
我在使用 spring-kafka 时遇到了同样的错误(spring-boot 版本:
3.0.2
,spring-cloud 版本:2022.0.1
)。在环境中使用 export
-ing KAFKA_USERNAME
和 KAFKA_PASSWORD
并在 spring-boot 的 application.yml
中设置下面的属性对我有用。
spring:
kafka:
properties:
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username='${KAFKA_USERNAME}' password='${KAFKA_PASSWORD}';
security:
protocol: SASL_SSL