即使我提供了错误的 clientId clientSecret 范围
clientId="1test_kafka" \
clientSecret="xxx" \
scope="1test_kafka_access" \
unsecuredLoginStringClaim_sub="thePrincipalName";
Kafka 工作正常,我无法理解如何?
我已经使用 Keycloak 设置了 OAUTH 2 提供商
POST https://localhost:8443/realms/myorg/protocol/openid-connect/token
Content-Type: application/x-www-form-urlencoded
grant_type=client_credentials&client_id=test_kafka&client_secret=xxx&scope=test_kafka_access
我得到了代币
HTTP/1.1 200 OK
Referrer-Policy: no-referrer
X-Frame-Options: SAMEORIGIN
Strict-Transport-Security: max-age=31536000; includeSubDomains
Cache-Control: no-store
X-Content-Type-Options: nosniff
Set-Cookie: KC_RESTART=; Version=1; Expires=Thu, 01-Jan-1970 00:00:10 GMT; Max-Age=0; Path=/realms/myorg/; HttpOnly
Pragma: no-cache
X-XSS-Protection: 1; mode=block
Content-Type: application/json
connection: close
content-length: 1439
{
"access_token": "token",
"expires_in": 300,
"refresh_expires_in": 0,
"token_type": "Bearer",
"not-before-policy": 0,
"scope": "email test_kafka_access profile"
}
如果我去 JWT.io 看看
现在这是
Kafka 服务器.properties
# Basic configurations
broker.id=0
num.network.threads=3
num.io.threads=8
log.dirs=/tmp/kafka-logs
num.partitions=1
# Zookeeper settings
zookeeper.connect=localhost:2181
# SASL/OAUTHBEARER mechanism
listeners=SASL_SSL://:9092
advertised.listeners=SASL_SSL://localhost:9092
inter.broker.listener.name=SASL_SSL
listener.name.sasl_ssl.sasl.enabled.mechanisms=OAUTHBEARER
# Inter-broker SASL mechanism configuration
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
# JAAS configuration for OAuth2
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="1test_kafka" \
clientSecret="xxx" \
scope="1test_kafka_access" \
unsecuredLoginStringClaim_sub="thePrincipalName";
# JWKS endpoint and audience/issuer settings
listener.name.sasl_ssl.sasl.oauthbearer.jwks.endpoint.url=https://localhost:8443/realms/myorg/protocol/openid-connect/certs
listener.name.sasl_ssl.sasl.oauthbearer.expected.audience=account
listener.name.sasl_ssl.sasl.oauthbearer.expected.issuer=https://localhost:8443/realms/myorg
# SSL configurations for secure communication
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit
# Enable detailed logging (optional)
log4j.logger.org.apache.kafka.common.security.oauthbearer=DEBUG
#log4j.logger.org.apache.kafka=DEBUG
如果您看到我通过添加 1 提供了错误的 clientID 与 clientSecret 相同,添加 1 添加 1 来扩大范围。
但是 kafka 仍然运行良好,我能够消费和生成消息
这是我的 Producer.config
bootstrap.servers=localhost:9092
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
# JAAS configuration for the producer
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
unsecuredLoginStringClaim_sub="thePrincipalName";
# \
# clientId="test_kafka" \
# clientSecret="xxx" \
# scope="test_kafka_access";
# Additional producer settings
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# SSL configurations
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit
这是我的consumer.config
# Consumer properties
bootstrap.servers=localhost:9092
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
# JAAS configuration for the consumer
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
unsecuredLoginStringClaim_sub="thePrincipalName" \
clientId="test_kafka" \
clientSecret="xxx" \
scope="test_kafka_access";
# Additional consumer settings
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# SSL configurations
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit
这是控制台输出
[2024-12-05 17:29:07,548] INFO AdminClientConfig values:
auto.include.jmx.reporter = true
bootstrap.controllers = []
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id = confluent-telemetry-reporter-local-producer
confluent.lkc.id = null
confluent.metrics.reporter.bootstrap.servers = kafka-0:9071
confluent.proxy.protocol.client.address = null
confluent.proxy.protocol.client.mode = PROXY
confluent.proxy.protocol.client.port = null
confluent.proxy.protocol.client.version = NONE
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
enable.metrics.push = true
host.resolver.class = class org.apache.kafka.clients.DefaultHostResolver
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 500
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = OAUTHBEARER
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = [account]
sasl.oauthbearer.expected.issuer = https://localhost:8443/realms/myorg
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = https://localhost:8443/realms/myorg/protocol/openid-connect/certs
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
(org.apache.kafka.clients.admin.AdminClientConfig)
[2024-12-05 17:29:07,582] INFO These configurations '[compression.type, enable.idempotence, acks, sasl.oauthbearer.expected.issuer, sasl.oauthbearer.expected.audience, key.serializer, max.request.size, value.serializer, interceptor.classes, max.in.flight.requests.per.connection, sasl.oauthbearer.jwks.endpoint.url, linger.ms]' were supplied but are not used yet. (org.apache.kafka.clients.admin.AdminClientConfig)
[2024-12-05 17:29:07,582] INFO Kafka version: 7.7.1-ce (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,582] INFO Kafka commitId: e42eb4aa8aba3e1cdae0b5edc71b43747ca85a6d (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,582] INFO Kafka startTimeMs: 1733399947582 (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,595] INFO Successfully validated token with principal thePrincipalName: {sub=thePrincipalName, exp=1.733403487269E9, iat=1.733399887269E9} (org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler)
[2024-12-05 17:29:07,611] INFO Successfully validated token with principal thePrincipalName: {sub=thePrincipalName, exp=1.733403487269E9, iat=1.733399887269E9} (org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler)
[2024-12-05 17:29:07,615] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='_confluent-telemetry-metrics', numPartitions=12, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='max.message.bytes', value='10485760'), CreateableTopicConfig(name='message.timestamp.type', value='CreateTime'), CreateableTopicConfig(name='min.insync.replicas', value='1'), CreateableTopicConfig(name='retention.ms', value='259200000'), CreateableTopicConfig(name='segment.ms', value='14400000'), CreateableTopicConfig(name='retention.bytes', value='-1')], linkName=null, mirrorTopic=null, sourceTopicId=AAAAAAAAAAAAAAAAAAAAAA, mirrorStartOffsetSpec=-9223372036854775808, mirrorStartOffsets=[]) (kafka.server.ZkAdminManager)
[2024-12-05 17:28:24,577] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='_confluent-command', numPartitions=1, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='min.insync.replicas', value='2')], linkName=null, mirrorTopic=null, sourceTopicId=AAAAAAAAAAAAAAAAAAAAAA, mirrorStartOffsetSpec=-9223372036854775808, mirrorStartOffsets=[]) (kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
在这里你可以看到我能够访问消费和生产
我不确定是否使用了oauth?
**设置使用 OAUTH 2 grant_type=client_credential 的 kafka 的正确方法是什么? **
不确定是否使用了oauth
就在那里......阅读日志
INFO Successfully validated token with principal thePrincipalName: {sub=thePrincipalName, ...}
(org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler)