我想在 Azure Synapse 中使用 pyspark 使用结构化流来进行 EventHub 消息消费,有人向我指出了这个项目 https://github.com/alexott/databricks-playground/tree/main/kafka-eventhubs-aad-auth 。如果可以修改为使用证书而不是秘密身份验证,有人可以指导我吗?我尝试修改如下所示的 ServicePrincipalCredentialsAuth 类以使用 ClientCredentialFactory.createFromCertificate 但我无法使其工作。
package net.alexott.demos.eventhubs_aad;
import com.microsoft.aad.msal4j.ClientCredentialFactory;
import com.microsoft.aad.msal4j.ConfidentialClientApplication;
import com.microsoft.aad.msal4j.IClientCredential;
import java.net.MalformedURLException;
import scala.collection.immutable.Map;
public class ServicePrincipalCredentialsAuth extends ServicePrincipalAuthBase {
private final String clientSecret;
private static final String AAD_CLIENT_SECRET_KEY = "aad_client_secret";
public ServicePrincipalCredentialsAuth(Map<String, String> params) {
super(params);
clientSecret = params.get(AAD_CLIENT_SECRET_KEY).get();
}
@Override
ConfidentialClientApplication getClient() throws MalformedURLException {
IClientCredential credential = ClientCredentialFactory.createFromSecret(this.clientSecret);
return ConfidentialClientApplication.builder(this.clientId, credential)
.authority(this.authEndpoint)
.build();
}
}
要使用证书身份验证而不是客户端密钥修改 EventHub Spark 结构化流的 Java 代码,您需要调整
ServicePrincipalCredentialsAuth
类以使用 ClientCredentialFactory.createFromCertificate
而不是 ClientCredentialFactory.createFromSecret
。下面是修改后的包含证书身份验证的 Java 代码:
ServicePrincipalCredentialsAuth 类:
public class ServicePrincipalCredentialsAuth extends ServicePrincipalAuthBase {
private final String certificatePath;
private final String certificatePassword;
private static final String AAD_CERTIFICATE_PATH_KEY = "aad_certificate_path";
private static final String AAD_CERTIFICATE_PASSWORD_KEY = "aad_certificate_password";
public ServicePrincipalCredentialsAuth(Map<String, String> params) {
super(params);
certificatePath = params.get(AAD_CERTIFICATE_PATH_KEY).get();
certificatePassword = params.get(AAD_CERTIFICATE_PASSWORD_KEY).get();
}
@Override
ConfidentialClientApplication getClient() throws MalformedURLException {
try {
KeyStore keyStore = KeyStore.getInstance("PKCS12");
FileInputStream stream = new FileInputStream(certificatePath);
keyStore.load(stream, certificatePassword.toCharArray());
PrivateKey privateKey = (PrivateKey) keyStore.getKey(keyStore.aliases().nextElement(), certificatePassword.toCharArray());
X509Certificate certificate = (X509Certificate) keyStore.getCertificate(keyStore.aliases().nextElement());
IClientCredential credential = ClientCredentialFactory.createFromCertificate(privateKey, certificate);
return ConfidentialClientApplication.builder(this.clientId, credential)
.authority(this.authEndpoint)
.build();
} catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException | UnrecoverableKeyException ex) {
throw new MalformedURLException("Failed to load certificate: " + ex.getMessage());
}
}
}
要检索存储在 Azure Key Vault 中的机密,请参阅此 GitHub
用于使用 Java 获取证书的 Azure Key Vault:
public ServicePrincipalCredentialsAuth(Map<String, String> params) {
super(params);
this.keyVaultUrl = params.get(AAD_KEYVAULT_URL_KEY);
this.clientId = params.get(AAD_CLIENT_ID_KEY);
this.clientSecret = params.get(AAD_CLIENT_SECRET_KEY);
this.certificateName = params.get(AAD_CERTIFICATE_NAME_KEY);
}
@Override
ConfidentialClientApplication getClient() throws MalformedURLException {
try {
ClientCertificateCredential credential = new ClientCertificateCredentialBuilder()
.clientId(clientId)
.clientCertificate(getClientCertificate())
.vaultUrl(keyVaultUrl)
.build();
CertificateClient certificateClient = new CertificateClientBuilder()
.credential(credential)
.vaultUrl(keyVaultUrl)
.buildClient();
CertificateWithPolicy certificateWithPolicy = certificateClient.getCertificate(certificateName);
X509Certificate certificate = certificateWithPolicy.getCertificate();
IClientCredential msalCredential = ClientCredentialFactory.createFromCertificate(certificate.getPublicKey(), certificate);