融合 Kafka 与 OAUTH 2 grant_type=client_credential

问题描述 投票:0回答:1

即使我提供了错误的 clientId clientSecret 范围

clientId="1test_kafka" \
    clientSecret="xxx" \
    scope="1test_kafka_access" \
    unsecuredLoginStringClaim_sub="thePrincipalName";

Kafka 工作正常,我无法理解如何?

我已经使用 Keycloak 设置了 OAUTH 2 提供商

POST https://localhost:8443/realms/myorg/protocol/openid-connect/token
Content-Type: application/x-www-form-urlencoded

grant_type=client_credentials&client_id=test_kafka&client_secret=xxx&scope=test_kafka_access

我得到了代币

HTTP/1.1 200 OK
Referrer-Policy: no-referrer
X-Frame-Options: SAMEORIGIN
Strict-Transport-Security: max-age=31536000; includeSubDomains
Cache-Control: no-store
X-Content-Type-Options: nosniff
Set-Cookie: KC_RESTART=; Version=1; Expires=Thu, 01-Jan-1970 00:00:10 GMT; Max-Age=0; Path=/realms/myorg/; HttpOnly
Pragma: no-cache
X-XSS-Protection: 1; mode=block
Content-Type: application/json
connection: close
content-length: 1439

{
  "access_token": "token",
  "expires_in": 300,
  "refresh_expires_in": 0,
  "token_type": "Bearer",
  "not-before-policy": 0,
  "scope": "email test_kafka_access profile"
}

如果我去 JWT.io 看看

JWT Token

现在这是

Kafka 服务器.properties

# Basic configurations
broker.id=0
num.network.threads=3
num.io.threads=8
log.dirs=/tmp/kafka-logs
num.partitions=1

# Zookeeper settings
zookeeper.connect=localhost:2181

# SASL/OAUTHBEARER mechanism
listeners=SASL_SSL://:9092
advertised.listeners=SASL_SSL://localhost:9092
inter.broker.listener.name=SASL_SSL
listener.name.sasl_ssl.sasl.enabled.mechanisms=OAUTHBEARER

# Inter-broker SASL mechanism configuration
sasl.mechanism.inter.broker.protocol=OAUTHBEARER

# JAAS configuration for OAuth2
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId="1test_kafka" \
    clientSecret="xxx" \
    scope="1test_kafka_access" \
    unsecuredLoginStringClaim_sub="thePrincipalName";

# JWKS endpoint and audience/issuer settings
listener.name.sasl_ssl.sasl.oauthbearer.jwks.endpoint.url=https://localhost:8443/realms/myorg/protocol/openid-connect/certs
listener.name.sasl_ssl.sasl.oauthbearer.expected.audience=account
listener.name.sasl_ssl.sasl.oauthbearer.expected.issuer=https://localhost:8443/realms/myorg

# SSL configurations for secure communication
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit

# Enable detailed logging (optional)
log4j.logger.org.apache.kafka.common.security.oauthbearer=DEBUG
#log4j.logger.org.apache.kafka=DEBUG

如果您看到我通过添加 1 提供了错误的 clientID 与 clientSecret 相同,添加 1 添加 1 来扩大范围。

但是 kafka 仍然运行良好,我能够消费和生成消息

这是我的 Producer.config

bootstrap.servers=localhost:9092
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER

# JAAS configuration for the producer
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    unsecuredLoginStringClaim_sub="thePrincipalName";
    #  \
    # clientId="test_kafka" \
    # clientSecret="xxx" \
    # scope="test_kafka_access";

# Additional producer settings
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# SSL configurations
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit


这是我的consumer.config

# Consumer properties
bootstrap.servers=localhost:9092
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER

# JAAS configuration for the consumer
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    unsecuredLoginStringClaim_sub="thePrincipalName" \
    clientId="test_kafka" \
    clientSecret="xxx" \
    scope="test_kafka_access";

# Additional consumer settings
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# SSL configurations
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit

这是控制台输出


[2024-12-05 17:29:07,548] INFO AdminClientConfig values: 
    auto.include.jmx.reporter = true
    bootstrap.controllers = []
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = confluent-telemetry-reporter-local-producer
    confluent.lkc.id = null
    confluent.metrics.reporter.bootstrap.servers = kafka-0:9071
    confluent.proxy.protocol.client.address = null
    confluent.proxy.protocol.client.mode = PROXY
    confluent.proxy.protocol.client.port = null
    confluent.proxy.protocol.client.version = NONE
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    enable.metrics.push = true
    host.resolver.class = class org.apache.kafka.clients.DefaultHostResolver
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.max.ms = 1000
    retry.backoff.ms = 500
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = OAUTHBEARER
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = [account]
    sasl.oauthbearer.expected.issuer = https://localhost:8443/realms/myorg
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = https://localhost:8443/realms/myorg/protocol/openid-connect/certs
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = SASL_SSL
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = /Users/sam/driveD/keystoret-truststore/keystore.jks
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = /Users/sam/driveD/keystoret-truststore/truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
 (org.apache.kafka.clients.admin.AdminClientConfig)

[2024-12-05 17:29:07,582] INFO These configurations '[compression.type, enable.idempotence, acks, sasl.oauthbearer.expected.issuer, sasl.oauthbearer.expected.audience, key.serializer, max.request.size, value.serializer, interceptor.classes, max.in.flight.requests.per.connection, sasl.oauthbearer.jwks.endpoint.url, linger.ms]' were supplied but are not used yet. (org.apache.kafka.clients.admin.AdminClientConfig)
[2024-12-05 17:29:07,582] INFO Kafka version: 7.7.1-ce (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,582] INFO Kafka commitId: e42eb4aa8aba3e1cdae0b5edc71b43747ca85a6d (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,582] INFO Kafka startTimeMs: 1733399947582 (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,595] INFO Successfully validated token with principal thePrincipalName: {sub=thePrincipalName, exp=1.733403487269E9, iat=1.733399887269E9} (org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler)
[2024-12-05 17:29:07,611] INFO Successfully validated token with principal thePrincipalName: {sub=thePrincipalName, exp=1.733403487269E9, iat=1.733399887269E9} (org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler)
[2024-12-05 17:29:07,615] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='_confluent-telemetry-metrics', numPartitions=12, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='max.message.bytes', value='10485760'), CreateableTopicConfig(name='message.timestamp.type', value='CreateTime'), CreateableTopicConfig(name='min.insync.replicas', value='1'), CreateableTopicConfig(name='retention.ms', value='259200000'), CreateableTopicConfig(name='segment.ms', value='14400000'), CreateableTopicConfig(name='retention.bytes', value='-1')], linkName=null, mirrorTopic=null, sourceTopicId=AAAAAAAAAAAAAAAAAAAAAA, mirrorStartOffsetSpec=-9223372036854775808, mirrorStartOffsets=[]) (kafka.server.ZkAdminManager)




[2024-12-05 17:28:24,577] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='_confluent-command', numPartitions=1, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='min.insync.replicas', value='2')], linkName=null, mirrorTopic=null, sourceTopicId=AAAAAAAAAAAAAAAAAAAAAA, mirrorStartOffsetSpec=-9223372036854775808, mirrorStartOffsets=[]) (kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

在这里你可以看到我能够访问消费和生产

output

我不确定是否使用了oauth?

**设置使用 OAUTH 2 grant_type=client_credential 的 kafka 的正确方法是什么? **

java apache-kafka oauth-2.0 keycloak sasl
1个回答
0
投票

不确定是否使用了oauth

就在那里......阅读日志

INFO Successfully validated token with principal thePrincipalName: {sub=thePrincipalName, ...}
 (org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler)
© www.soinside.com 2019 - 2024. All rights reserved.