我正在尝试使用 Apache Camel 4.4.3 及其 Java 17 上的 Kafka 组件(在 Windows 10 下运行)将消息发送到 Azure 事件中心(高级)。虽然我已成功使用其他 Kafka 客户端(例如 Kafka UI、Confluence 的 Java 客户端) ,并直接通过 Apache Camel 的 Azure Eventhubs 组件)连接和发送消息,Camel Kafka 组件在身份验证过程中始终失败。
这是配置 Kafka 端点的代码的基本部分:
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() {
String kafkaEndpointUri = "kafka:{{eventhub.name}}?brokers={{eventhub.namespace}}.servicebus.windows.net:9093"
+ "&securityProtocol=SASL_SSL"
+ "&saslMechanism=PLAIN"
+ "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{{eventhub.connectionstring}}\";";
from("timer://foo?repeatCount=1")
.setBody(constant("Hello from Camel to Azure EventHub!"))
.to(kafkaEndpointUri);
}
});
应用程序在 SASL 身份验证期间抛出运行时异常:
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to SEND_APIVERSIONS_REQUEST
[main] DEBUG org.apache.camel.impl.DefaultCamelContext - start() took 536 millis
Camel application is running. Press Ctrl + C to terminate.
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Creating SaslClient: client=null;service=kafka;serviceHostname=EVENTHUB_NAMESPACE.servicebus.windows.net;mechs=[PLAIN]
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.SslTransportLayer - [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=EVENTHUB_NAMESPACE.servicebus.windows.net/XXX.XXX.XXX.XXX:9093], selector=sun.nio.ch.WEPollSelectorImpl@2643665b, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'EVENTHUB_NAMESPACE.servicebus.windows.net' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net, O=Microsoft Corporation, L=Redmond, ST=WA, C=US' protocol 'TLSv1.3' cipherSuite 'TLS_AES_256_GCM_SHA384'
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to SEND_HANDSHAKE_REQUEST
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to INITIAL
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - [Producer clientId=producer-1] Set SASL client state to INTERMEDIATE
[Camel (camel-1) thread #1 - timer://foo] DEBUG org.apache.camel.processor.SendProcessor - >>>> kafka://EVENTHUB_NAME?brokers=EVENTHUB_NAMESPACE.servicebus.windows.net%3A9093&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL Exchange[]
[Camel (camel-1) thread #1 - timer://foo] DEBUG org.apache.camel.component.kafka.KafkaProducer - Sending message to topic: EVENTHUB_NAME, partition: null, key: null
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Unexpected error from EVENTHUB_NAMESPACE.servicebus.windows.net/XXX.XXX.XXX.XXX (channelId=-1); closing connection
java.lang.RuntimeException: non-nullable field authBytes was serialized as null
带有事件中心组件的 Apache Camel 的工作示例如下所示:
...
.to(String.format("azure-eventhubs:?connectionString=RAW(%s)", connectionStringWithTopic));
Confluences Java 客户端的工作示例:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectioString));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
Producer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(eventHubName, "key", "This is a message from regular Java app using Confluent Kafka!");
producer.send(record);
鉴于使用其他客户端时没有问题,我怀疑 Apache Camel Kafka 组件和 Azure 事件中心之间可能存在兼容性问题,或者与 Camel 存在特定的配置细微差别。我尝试过设置一堆附加属性,例如 sslProtocol、clientId 等,但我的尝试都没有成功。
有没有人经历过类似的问题或可以提供有关这里可能出现问题的见解? Apache Camel 中是否有我可能忽略的特定设置或配置?
感谢Ralphi1找到我的问题的解决方案。这是解决我的问题的代码:
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() {
String kafkaEndpointUri = "kafka:{{eventhub.name}}?brokers={{eventhub.namespace}}.servicebus.windows.net:9093"
+ "&securityProtocol=SASL_SSL"
+ "&saslMechanism=PLAIN"
+ "&saslJaasConfig=RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{{eventhub.connectionstring}}\";)";
from("timer://foo?repeatCount=1")
.setBody(constant("Hello from Camel to Azure EventHub!"))
.to(kafkaEndpointUri);
}
});
这里的关键是使用
RAW()
函数来确保连接字符串中的特殊字符不会被 Camel 解析器误解。