我正在使用 kafka kafka_2.13-3.9.0 并使用 OKTA 配置 OAUTHBEARER machaninsm 。
设置经纪商的标准方法是什么。
启动 kafka 服务器时出现此错误
./bin/kafka-server-start.sh config/server.properties
[2024-12-16 23:31:44,482] ERROR No principal name in JWT claim: sub (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
java.io.IOException: No principal name in JWT claim: sub
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:166)
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:317)
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:302)
at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:679)
at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:677)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:677)
at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:205)
at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:151)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:64)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:107)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:173)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:193)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:82)
at kafka.server.NodeToControllerChannelManagerImpl.buildNetworkClient$1(NodeToControllerChannelManager.scala:162)
at kafka.server.NodeToControllerChannelManagerImpl.newRequestThread(NodeToControllerChannelManager.scala:202)
at kafka.server.NodeToControllerChannelManagerImpl.<init>(NodeToControllerChannelManager.scala:141)
at kafka.server.KafkaServer.startup(KafkaServer.scala:357)
at kafka.Kafka$.main(Kafka.scala:112)
at kafka.Kafka.main(Kafka.scala)
Caused by: org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException: No principal name in JWT claim: sub
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:220)
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:164)
... 21 more
Caused by: org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException: No principal name in JWT claim: sub
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.<init>(OAuthBearerUnsecuredJws.java:109)
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:212)
... 22 more
服务器属性
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
inter.broker.listener.name=SASL_PLAINTEXT
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT
# SASL and OAuth2 Configuration
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
listener.name.sasl_plaintext.sasl.enabled.mechanisms=OAUTHBEARER
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
# JWKS endpoint for token validation
sasl.oauthbearer.jwks.endpoint.url=https://dev-someId.okta.com/oauth2/default/v1/keys
# JAAS Configuration for OAuth2 (Inline in server.properties for simplicity)
listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="xxxxxxxxxx" \
clientSecret="xxxxxxxxxvxvxvxvxvvxvxvxvxvvxvxvxvxvxvxvxv" \
scope="kafka" \
grantType="client_credentials" \
tokenEndpointUrl="https://dev-someid.okta.com/oauth2/default/v1/token";
如果我解码令牌
{
"ver": 1,
"jti": "AT.XxXY_2aH9Oxz5IC-QruS4Iqhh_7TrfCM6Sj3lzDnKDY",
"iss": "https://dev-someId.okta.com/oauth2/default",
"aud": "api://default",
"iat": 1734369931,
"exp": 1734373531,
"cid": "0oalmwzen2tCuDxB05d7",
"scp": [
"kafka"
],
"sub": "0oalmwzen2tCuDxB05d7"
}
我不确定是什么导致了这个错误
使用 OKTA 配置 Kafka 的正确方法
服务器属性
broker.id=0
log.dirs=/Users/omsairam/tools/kafka_2.13-3.9.0/data/kafka-logs
zookeeper.connect=localhost:2181
sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://localhost:9093
advertised.listeners=SASL_PLAINTEXT://localhost:9093
#Authorizer for ACL
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:<ProducerClientID>;
sasl.oauthbearer.token.endpoint.url=https://hostname/oauth2/default/v1/token
sasl.oauthbearer.jwks.endpoint.url=https://hostname/oauth2/default/v1/keys
sasl.oauthbearer.expected.audience=api://default
listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='<ProducerClientID>' clientSecret='<ProducerClientSecret>' scope='<scope>';
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
offsets.topic.replication.factor=1
kafka_server_jass.conf
KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
clientId="<ProducerClientID>"
clientSecret="<ProducerClientSecret>"
scope="<scope>";
};
启动服务器
export KAFKA_OPTS="-Djava.security.auth.login.config=/Users/omsairam/tools/kafka_2.13-3.9.0/config/kafka_server_jaas.conf"
./bin/kafka-server-start.sh config/server.properties
这对我有用。