如何在 fs.azure.account.oauth2.msi.endpoint 中动态设置“api-version”

问题描述 投票:0回答:1

目前我正在通过 pyspark 库使用 hadoop-azure-3.4.1 连接到 ABFS。根据文档 - https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity - 可以选择使用托管身份进行身份验证。此功能在 Azure VM 上完美运行,但在我的本地服务器上失败并出现以下错误:

Caused by: org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://127.0.0.1:40342/metadata/identity/oauth2/token' AADToken: HTTP connection to http://127.0.0.1:40342/metadata/identity/oauth2/token failed for getting token from AzureAD.; contentType='application/json'; response '{"error":"invalid_request","error_description":"The api-version '2018-02-01' could not be found. Supported are 2021-02-01 2020-06-01 2019-11-01 2019-08-15","error_codes":[404],"timestamp":"2024-11-20 00:21:23.47342756 +0000 UTC m=+4427.811320871","trace_id":"","correlation_id":"efee3d6c-ddde-465c-96dd-d0c68c7f7ebd"}'

http://127.0.0.1:40342/metadata/identity/oauth2/token是用于在通过Arc连接到Azure的计算机上获取令牌的端点:通过ARC使用MI的Azure官方文档

基于hadoop-azure代码,看起来这个版本是硬编码的:https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/ hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java#L176

我的Python代码示例:

from pyspark.sql import SparkSession

# Spark add-ons
path_to_hadoop_azure_jar = "/opt/hadoop-azure-3.4.1.jar"
path_to_hadoop_common_jar = "/opt/hadoop-common-3.4.1.jar"
path_to_azure_storage_jar = "/opt/azure-storage-8.6.6.jar"
path_to_azure_datalake_jar = "/opt/hadoop-azure-datalake-3.4.1.jar"

# ABFS variables
account_name = "pilotdbwsa"
container_name = "pilot-dbw"
container_path = "test1-test1-arc"
abfs_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{container_path}"

# Spark Session setup
spark = SparkSession.builder.appName("AzureDataRead") \
    .config("spark.jars", f"{path_to_hadoop_common_jar},{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_azure_datalake_jar}") \
    .getOrCreate()
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")

spark.conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.msi.endpoint.{account_name}.dfs.core.windows.net", "http://127.0.0.1:40342/metadata/identity/oauth2/token")

# Logging
spark.sparkContext.setLogLevel("DEBUG")

# Create a simple DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Write the DataFrame to ABFS as Parquet
try:
    df.write.parquet(abfs_path)
    print(f"Parquet file successfully written to {abfs_path}")
except Exception as e:
    print(f"Error writing Parquet file: {e}")

问题:是否可以通过编程方式覆盖 api-version 或在 Spark 配置中指定它?

apache-spark hadoop pyspark azure-arc
1个回答
0
投票

是的,您是对的,从支持 arc 的服务器获取访问令牌与常规服务器不同。

它调用中间密钥的端点,并通过基本授权生成访问令牌。

首次向端点发出请求时,它应该处理异常并使用该元数据发出进一步请求来获取令牌,但在 hadoop azure jar 中,即使使用支持的 Api 版本,也不会处理引发错误。

因此,以下是可能的解决方法。

  1. 在java中创建一个自定义类,它具有与here给出的代码类似的功能,并在

    spark.conf
    中使用该类,就像here

    提到的自定义令牌提供程序一样
  2. 现在,您可以使用 pandas 来读取/写入文件,并使用提供默认凭据的存储选项。

首先,运行以下命令。

pip install fsspec,adlfs

代码:

import pandas as pd

strg_opt = {'account_name': "jadls", 'anon': False}
p_df = pd.read_csv("abfs://data/csv/bulk.csv" ,storage_options=strg_opt)

df = spark.createDataFrame(p_df)
df.show()
p_df[["Date","Symbol"]].to_csv("abfs://data/csv/new.csv",storage_options=strg_opt)

在这里,将

anon:False
传递给
storage_options
使用默认凭证(托管身份)并从中创建 Spark 数据帧。

接下来,路径应如下格式

abfs://{CONTAINER}/{FOLDER}/filename.csv

输出:

enter image description here

并在存储帐户中

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.