EventHub Spark 使用证书身份验证的结构化流

问题描述 投票:0回答:1

我想在 Azure Synapse 中使用 pyspark 使用结构化流来进行 EventHub 消息消费,有人向我指出了这个项目 https://github.com/alexott/databricks-playground/tree/main/kafka-eventhubs-aad-auth 。如果可以修改为使用证书而不是秘密身份验证,有人可以指导我吗?我尝试修改如下所示的 ServicePrincipalCredentialsAuth 类以使用 ClientCredentialFactory.createFromCertificate 但我无法使其工作。

package net.alexott.demos.eventhubs_aad;

import com.microsoft.aad.msal4j.ClientCredentialFactory;
import com.microsoft.aad.msal4j.ConfidentialClientApplication;
import com.microsoft.aad.msal4j.IClientCredential;


import java.net.MalformedURLException;
import scala.collection.immutable.Map;

public class ServicePrincipalCredentialsAuth extends ServicePrincipalAuthBase {
    private final String clientSecret;

    private static final String AAD_CLIENT_SECRET_KEY = "aad_client_secret";

    public ServicePrincipalCredentialsAuth(Map<String, String> params) {
        super(params);
        clientSecret = params.get(AAD_CLIENT_SECRET_KEY).get();
    }

    @Override
    ConfidentialClientApplication getClient() throws MalformedURLException {
        IClientCredential credential = ClientCredentialFactory.createFromSecret(this.clientSecret);
        return ConfidentialClientApplication.builder(this.clientId, credential)
            .authority(this.authEndpoint)
            .build();
    }

}
apache-spark pyspark spark-streaming azure-synapse azure-eventhub
1个回答
0
投票

要使用证书身份验证而不是客户端密钥修改 EventHub Spark 结构化流的 Java 代码,您需要调整

ServicePrincipalCredentialsAuth
类以使用
ClientCredentialFactory.createFromCertificate
而不是
ClientCredentialFactory.createFromSecret
。下面是修改后的包含证书身份验证的 Java 代码:

ServicePrincipalCredentialsAuth 类:



public class ServicePrincipalCredentialsAuth extends ServicePrincipalAuthBase {
    private final String certificatePath;
    private final String certificatePassword;

    private static final String AAD_CERTIFICATE_PATH_KEY = "aad_certificate_path";
    private static final String AAD_CERTIFICATE_PASSWORD_KEY = "aad_certificate_password";

    public ServicePrincipalCredentialsAuth(Map<String, String> params) {
        super(params);
        certificatePath = params.get(AAD_CERTIFICATE_PATH_KEY).get();
        certificatePassword = params.get(AAD_CERTIFICATE_PASSWORD_KEY).get();
    }

    @Override
    ConfidentialClientApplication getClient() throws MalformedURLException {
        try {
            KeyStore keyStore = KeyStore.getInstance("PKCS12");
            FileInputStream stream = new FileInputStream(certificatePath);
            keyStore.load(stream, certificatePassword.toCharArray());
            PrivateKey privateKey = (PrivateKey) keyStore.getKey(keyStore.aliases().nextElement(), certificatePassword.toCharArray());
            X509Certificate certificate = (X509Certificate) keyStore.getCertificate(keyStore.aliases().nextElement());
            IClientCredential credential = ClientCredentialFactory.createFromCertificate(privateKey, certificate);
            return ConfidentialClientApplication.builder(this.clientId, credential)
                    .authority(this.authEndpoint)
                    .build();
        } catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException | UnrecoverableKeyException ex) {
            throw new MalformedURLException("Failed to load certificate: " + ex.getMessage());
        }
    }
}



要检索存储在 Azure Key Vault 中的机密,请参阅此 GitHub

用于使用 Java 获取证书的 Azure Key Vault:

   public ServicePrincipalCredentialsAuth(Map<String, String> params) {
        super(params);
        this.keyVaultUrl = params.get(AAD_KEYVAULT_URL_KEY);
        this.clientId = params.get(AAD_CLIENT_ID_KEY);
        this.clientSecret = params.get(AAD_CLIENT_SECRET_KEY);
        this.certificateName = params.get(AAD_CERTIFICATE_NAME_KEY);
    }

    @Override
    ConfidentialClientApplication getClient() throws MalformedURLException {
        try {
           
            ClientCertificateCredential credential = new ClientCertificateCredentialBuilder()
                    .clientId(clientId)
                    .clientCertificate(getClientCertificate())
                    .vaultUrl(keyVaultUrl)
                    .build();

            CertificateClient certificateClient = new CertificateClientBuilder()
                    .credential(credential)
                    .vaultUrl(keyVaultUrl)
                    .buildClient();

            CertificateWithPolicy certificateWithPolicy = certificateClient.getCertificate(certificateName);

            X509Certificate certificate = certificateWithPolicy.getCertificate();

          
            IClientCredential msalCredential = ClientCredentialFactory.createFromCertificate(certificate.getPublicKey(), certificate);

enter image description here

  • 请参阅此链接,了解 Azure Synapse 中的 Azure 密钥保管库实例。
© www.soinside.com 2019 - 2024. All rights reserved.