我正在尝试在我的项目中使用 Azure 上托管的 Confluence Cloud,我们确实有 API 密钥和密钥,但我无法连接并且无法读取来自 kafka 主题的消息。我浏览了链接:https://docs.confluence.io/cloud/current/access-management/authenticate/api-keys/best-practices-api-keys.html,但不清楚如何设置它进入Java消费者代码并且也遇到以下错误
Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed
我使用了下面的代码
public class HelloConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXXXXXXXXX.azure.confluent.cloud:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "someid");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + "apiKeyValue" + "\" password=\"" + "secreteValue" + "\";");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("test-topic"));
while(true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
System.out.println("Key: " + record.key() + ", Value: " + record.value());
System.out.println("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
}
在没有对代码进行任何测试的情况下,这看起来像是变量引用问题? 您正在
props.put(SaslConfigs.SASL_JAAS_CONFIG, ...
中加入用户名和密码字符串。
username=\"" + "apiKeyValue" + "\" password=\"" + "secreteValue" + "\";");
应该是
username=\"" + apiKeyValue + "\" password=\"" + secreteValue + "\";");
解决这个问题后可能会出现其他问题,但我会从那里开始。