以 Camel-Quarkus 提供的官方示例为起点,我修改了逻辑,以便写入 Kafka 代理。随着 Camel Kafka 组件指向本地代理,所有都运行良好。
尝试联系我们的 Confluence Cloud 代理时,事情会变得稍微复杂一些。我们使用的安全协议是 SASL_SSL。以下代码片段会导致在该问题末尾添加日志。为了重现,请在此处找到完整的代码https://github.com/LeonardoBonacci/camel-kafka-sasl
final String brokers = "the-kafka-host.confluent.cloud:9092";
final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD";
from("direct:start")
.setBody(exchange -> "I do not arrive")
.log(LoggingLevel.INFO, "Sending to Kafka: ${body}")
.to("kafka:foo-topic?"
+ "brokers=" + brokers
+ "&saslMechanism=PLAIN"
+ "&securityProtocol=SASL_SSL"
+ "&sslEndpointAlgorithm=HTTPS"
+ "&saslJaasConfig=" + saslJaasConfig);
记录的 ProducerConfig 似乎是正确的。当我在普通 Kafka Producer 中使用相同的凭据时,除了将记录写入 Kafka 主题之外,它还会打印几乎相同的 ProducerConfig。这表明配置值已很好地传播到底层生产者。
解释日志后发现 SSL 握手有效。下一步不太成功:SaslClientAuthenticator尝试失败。
从各种相互矛盾的博客文章和官方文档中,我无法推断出 SASL_SSL 是否实际上受支持。
有人可以帮我解决这个问题吗?非常感谢!
2020-11-02 07:16:03,244 DEBUG [org.apa.cam.sup.DefaultComponent] (Quarkus Main Thread) Creating endpoint uri=[direct://start], path=[start]
2020-11-02 07:16:03,246 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) direct://start converted to endpoint: direct://start by component: org.apache.camel.component.direct.DirectComponent@28f69db6
2020-11-02 07:16:03,261 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Resolving language: simple
2020-11-02 07:16:03,266 DEBUG [org.apa.cam.rei.LogReifier] (Quarkus Main Thread) LogName is not configured, using route id as logName: route1
2020-11-02 07:16:03,267 DEBUG [org.apa.cam.imp.con.CoreTypeConverterRegistry] (Quarkus Main Thread) Promoting fallback type converter as a known type converter to convert from: org.apache.camel.LoggingLevel to: java.lang.String for the fallback converter: org.apache.camel.impl.converter.EnumTypeConverter@1fecfc7b
2020-11-02 07:16:03,269 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using ComponentResolver: org.apache.camel.quarkus.core.FastCamelContext$$Lambda$693/0x0000000840394040@51356a7e to resolve component with name: kafka
2020-11-02 07:16:03,269 DEBUG [org.apa.cam.sup.ResolverHelper] (Quarkus Main Thread) Lookup Component with name kafka in registry. Found: org.apache.camel.component.kafka.KafkaComponent@a614e14
2020-11-02 07:16:03,270 DEBUG [org.apa.cam.imp.eng.DefaultConfigurerResolver] (Quarkus Main Thread) Found configurer: kafka-component via type: org.apache.camel.component.kafka.KafkaComponentConfigurer via: META-INF/services/org/apache/camel/configurer/kafka-component
2020-11-02 07:16:03,270 DEBUG [org.apa.cam.imp.eng.DefaultConfigurerResolver] (Quarkus Main Thread) Found configurer: kafka-endpoint via type: org.apache.camel.component.kafka.KafkaEndpointConfigurer via: META-INF/services/org/apache/camel/configurer/kafka-endpoint
2020-11-02 07:16:03,272 DEBUG [org.apa.cam.sup.DefaultComponent] (Quarkus Main Thread) Creating endpoint uri=[kafka://foo-topic?brokers=the-kafka-host.confluent.cloud%3A9092&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslEndpointAlgorithm=HTTPS], path=[foo-topic]
2020-11-02 07:16:03,278 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) kafka://foo-topic?brokers=the-kafka-host.confluent.cloud%3A9092&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslEndpointAlgorithm=HTTPS converted to endpoint: kafka://foo-topic?brokers=the-kafka-host.confluent.cloud%3A9092&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslEndpointAlgorithm=HTTPS by component: org.apache.camel.component.kafka.KafkaComponent@a614e14
2020-11-02 07:16:03,282 DEBUG [org.apa.cam.sup.EventHelper] (Quarkus Main Thread) Ignoring notifying event Initialized CamelContext: camel-1. The EventNotifier has not been started yet: org.apache.camel.quarkus.core.CamelManagementEventBridge@7650b836
2020-11-02 07:16:03,282 DEBUG [org.apa.cam.sup.EventHelper] (Quarkus Main Thread) Ignoring notifying event Initialized CamelContext: camel-1. The EventNotifier has not been started yet: org.apache.camel.quarkus.core.CamelContextRuntime$1@519e862a
2020-11-02 07:16:03,283 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Resolving language: simple
2020-11-02 07:16:03,285 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Apache Camel 3.6.0 (camel-1) is starting
2020-11-02 07:16:03,287 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using ClassResolver=org.apache.camel.impl.engine.DefaultClassResolver@35084cf5, PackageScanClassResolver=org.apache.camel.impl.engine.DefaultPackageScanClassResolver@2b7ebe50, ApplicationContextClassLoader=null, RouteController=org.apache.camel.impl.engine.DefaultRouteController@5bfb4540
2020-11-02 07:16:03,289 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2020-11-02 07:16:03,289 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using HeadersMapFactory: org.apache.camel.impl.engine.DefaultHeadersMapFactory@1ae00ddf
2020-11-02 07:16:03,290 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using ReactiveExecutor: org.apache.camel.impl.engine.DefaultReactiveExecutor@1621a5c3
2020-11-02 07:16:03,290 DEBUG [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Using ThreadPoolFactory: org.apache.camel.support.DefaultThreadPoolFactory@205f397
2020-11-02 07:16:03,293 DEBUG [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Warming up route id: route1 having autoStartup=true
2020-11-02 07:16:03,304 INFO [org.apa.kaf.cli.pro.ProducerConfig] (Quarkus Main Thread) ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [the-kafka-host.confluent.cloud:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = PLAIN
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = HTTPS
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2020-11-02 07:16:03,515 INFO [org.apa.kaf.com.sec.aut.AbstractLogin] (Quarkus Main Thread) Successfully logged in.
2020-11-02 07:16:03,585 DEBUG [org.apa.kaf.com.sec.ssl.SslEngineBuilder] (Quarkus Main Thread) Created SSL context with keystore null, truststore null, provider SunJSSE.
2020-11-02 07:16:03,622 DEBUG [org.apa.kaf.cli.pro.int.Sender] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Starting Kafka producer I/O thread.
2020-11-02 07:16:03,623 INFO [org.apa.kaf.com.uti.AppInfoParser] (Quarkus Main Thread) Kafka version: 2.5.0
2020-11-02 07:16:03,625 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Initialize connection to node the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) for sending metadata request
2020-11-02 07:16:03,625 INFO [org.apa.kaf.com.uti.AppInfoParser] (Quarkus Main Thread) Kafka commitId: 66563e712b0b9f84
2020-11-02 07:16:03,630 INFO [org.apa.kaf.com.uti.AppInfoParser] (Quarkus Main Thread) Kafka startTimeMs: 1604254563622
2020-11-02 07:16:03,635 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Initiating connection to node the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) using address the-kafka-host.confluent.cloud/123.123.123.123
2020-11-02 07:16:03,636 DEBUG [org.apa.kaf.cli.pro.KafkaProducer] (Quarkus Main Thread) [Producer clientId=producer-1] Kafka producer started
2020-11-02 07:16:03,636 DEBUG [org.apa.cam.com.kaf.KafkaProducer] (Quarkus Main Thread) Created KafkaProducer: org.apache.kafka.clients.producer.KafkaProducer@1939e92
2020-11-02 07:16:03,644 DEBUG [org.apa.cam.imp.eng.BaseExecutorServiceManager] (Quarkus Main Thread) Created new ThreadPool for source: kafka://foo-topic?brokers=the-kafka-host.confluent.cloud%3A9092&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&sslEndpointAlgorithm=HTTPS with name: KafkaProducer[foo-topic]. -> org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@4a80dbc9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0][KafkaProducer[foo-topic]]
2020-11-02 07:16:03,646 DEBUG [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Route: route1 >>> Route[direct://start -> null]
2020-11-02 07:16:03,646 DEBUG [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Starting consumer (order: 1000) on route: route1
2020-11-02 07:16:03,648 DEBUG [org.apa.cam.sup.DefaultConsumer] (Quarkus Main Thread) Init consumer: Consumer[direct://start]
2020-11-02 07:16:03,648 DEBUG [org.apa.cam.sup.DefaultConsumer] (Quarkus Main Thread) Starting consumer: Consumer[direct://start]
2020-11-02 07:16:03,649 INFO [org.apa.cam.imp.eng.InternalRouteStartupManager] (Quarkus Main Thread) Route: route1 started and consuming from: direct://start
2020-11-02 07:16:03,652 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Total 1 routes, of which 1 are started
2020-11-02 07:16:03,652 INFO [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Apache Camel 3.6.0 (camel-1) started in 0.365 seconds
2020-11-02 07:16:03,655 INFO [io.quarkus] (Quarkus Main Thread) sasl 1.0 on JVM (powered by Quarkus 1.9.0.Final) started in 2.528s.
2020-11-02 07:16:03,656 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2020-11-02 07:16:03,661 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [camel-core, camel-direct, camel-kafka, camel-support-common, cdi]
2020-11-02 07:16:03,661 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to SEND_APIVERSIONS_REQUEST
2020-11-02 07:16:03,662 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Creating SaslClient: client=null;service=kafka;serviceHostname=the-kafka-host.confluent.cloud;mechs=[PLAIN]
2020-11-02 07:16:03,672 DEBUG [org.apa.kaf.com.net.Selector] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2020-11-02 07:16:03,740 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
2020-11-02 07:16:03,859 DEBUG [org.apa.kaf.com.net.SslTransportLayer] (kafka-producer-network-thread | producer-1) [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=the-kafka-host.confluent.cloud/123.123.123.123:9092], selector=sun.nio.ch.WindowsSelectorImpl@652ed71e, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'the-kafka-host.confluent.cloud' peerPort 9092 peerPrincipal 'CN=*.ap-southeast-2.aws.confluent.cloud' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
2020-11-02 07:16:03,895 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2020-11-02 07:16:03,935 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to SEND_HANDSHAKE_REQUEST
2020-11-02 07:16:03,936 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2020-11-02 07:16:03,974 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to INITIAL
2020-11-02 07:16:03,977 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to INTERMEDIATE
2020-11-02 07:16:05,155 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to FAILED
2020-11-02 07:16:05,155 INFO [org.apa.kaf.com.net.Selector] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Failed authentication with the-kafka-host.confluent.cloud/123.123.123.123 (Authentication failed)
2020-11-02 07:16:05,160 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Node -1 disconnected.
2020-11-02 07:16:05,160 ERROR [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Connection to node -1 (the-kafka-host.confluent.cloud/123.123.123.123:9092) failed authentication due to: Authentication failed
2020-11-02 07:16:05,161 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Bootstrap broker the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-02 07:16:05,261 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending metadata request since no node is available
2020-11-02 07:16:05,311 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Initialize connection to node the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) for sending metadata request
2020-11-02 07:16:05,311 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Initiating connection to node the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) using address the-kafka-host.confluent.cloud/123.123.123.123
2020-11-02 07:16:05,314 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to SEND_APIVERSIONS_REQUEST
2020-11-02 07:16:05,317 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Creating SaslClient: client=null;service=kafka;serviceHostname=the-kafka-host.confluent.cloud;mechs=[PLAIN]
2020-11-02 07:16:05,349 DEBUG [org.apa.kaf.com.net.Selector] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2020-11-02 07:16:05,357 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
2020-11-02 07:16:05,400 DEBUG [org.apa.kaf.com.net.SslTransportLayer] (kafka-producer-network-thread | producer-1) [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=the-kafka-host.confluent.cloud/123.123.123.123:9092], selector=sun.nio.ch.WindowsSelectorImpl@652ed71e, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'the-kafka-host.confluent.cloud' peerPort 9092 peerPrincipal 'CN=*.ap-southeast-2.aws.confluent.cloud' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
2020-11-02 07:16:05,400 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2020-11-02 07:16:05,440 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to SEND_HANDSHAKE_REQUEST
2020-11-02 07:16:05,441 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2020-11-02 07:16:05,476 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to INITIAL
2020-11-02 07:16:05,477 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to INTERMEDIATE
2020-11-02 07:16:06,722 DEBUG [org.apa.kaf.com.sec.aut.SaslClientAuthenticator] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Set SASL client state to FAILED
2020-11-02 07:16:06,722 INFO [org.apa.kaf.com.net.Selector] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Failed authentication with the-kafka-host.confluent.cloud/123.123.123.123 (Authentication failed)
2020-11-02 07:16:06,724 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Node -1 disconnected.
2020-11-02 07:16:06,724 ERROR [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Connection to node -1 (the-kafka-host.confluent.cloud/123.123.123.123:9092) failed authentication due to: Authentication failed
2020-11-02 07:16:06,727 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Bootstrap broker the-kafka-host.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-02 07:16:06,824 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending metadata request since no node is available
2020-11-02 07:16:06,876 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending metadata request since no node is available
2020-11-02 07:16:06,927 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending metadata request since no node is available
2020-11-02 07:16:06,978 DEBUG [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Give up sending metadata request since no node is available
2
使用 dsl 可以解决问题:
kafka(topic)
.brokers(props.getProperty("bootstrap.servers"))
.saslMechanism(props.getProperty("sasl.mechanism"))
.securityProtocol(props.getProperty("security.protocol"))
.sslEndpointAlgorithm(props.getProperty("ssl.endpoint.identification.algorithm"))
.saslJaasConfig(props.getProperty("sasl.jaas.config")))
首先,确保您的
saslJaasConfig
:
用户名被转义双引号或单引号包围:
required username=\"USERNAME\"
或 required username='USERNAME'
密码被转义双引号或单引号括起来:
password=\"PASSWORD\"
或 password='PASSWORD'
以
;
结尾
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
重要: 如果密码包含特殊字符,这可能会导致问题,因为 Camel 的字符串解析器可能会将这些特殊字符与路由中的其他参数混淆。如果您的密码包含这些,您可以将
saslJaasConfig
表示为 RAW:
String saslJaasConfig = "RAW(org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";)";
就我而言,这解决了问题。它还解释了为什么使用 DSL 可以解决这个问题 - 使用 DSL,值以声明方式设置,并且不需要字符串解析。
我认为信任库文件存在问题,您必须在客户端提及信任库文件,请仅检查sslTruststoreLocation属性
Eg: sslTruststoreLocation=E:\\test\\kafka.client.truststore.jks
对于具有 SASL 安全性的 Apache Camel Kafka 组件,您需要以下安全配置:
camel.component.kafka.securityProtocol=SASL_SSL Camel.component.kafka.saslMechanism=SCRAM-SHA-512 Camel.component.kafka.saslJaasConfig=org.apache.kafka.common.security.scram.ScramLoginModule 需要用户名=“admin”密码=“admin”;
欲了解更多详情,请查看这篇文章: https://medium.com/@nihatonder87/kafka-client-for-axual-with-camel-kafka-component-bb1b89acd638