目前我正在通过 pyspark 库使用 hadoop-azure-3.4.1 连接到 ABFS。根据文档 - https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity - 可以选择使用托管身份进行身份验证。此功能在 Azure VM 上完美运行,但在我的本地服务器上失败并出现以下错误:
Caused by: org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://127.0.0.1:40342/metadata/identity/oauth2/token' AADToken: HTTP connection to http://127.0.0.1:40342/metadata/identity/oauth2/token failed for getting token from AzureAD.; contentType='application/json'; response '{"error":"invalid_request","error_description":"The api-version '2018-02-01' could not be found. Supported are 2021-02-01 2020-06-01 2019-11-01 2019-08-15","error_codes":[404],"timestamp":"2024-11-20 00:21:23.47342756 +0000 UTC m=+4427.811320871","trace_id":"","correlation_id":"efee3d6c-ddde-465c-96dd-d0c68c7f7ebd"}'
http://127.0.0.1:40342/metadata/identity/oauth2/token是用于在通过Arc连接到Azure的计算机上获取令牌的端点:通过ARC使用MI的Azure官方文档。
基于hadoop-azure代码,看起来这个版本是硬编码的:https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/ hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java#L176
我的Python代码示例:
from pyspark.sql import SparkSession
# Spark add-ons
path_to_hadoop_azure_jar = "/opt/hadoop-azure-3.4.1.jar"
path_to_hadoop_common_jar = "/opt/hadoop-common-3.4.1.jar"
path_to_azure_storage_jar = "/opt/azure-storage-8.6.6.jar"
path_to_azure_datalake_jar = "/opt/hadoop-azure-datalake-3.4.1.jar"
# ABFS variables
account_name = "pilotdbwsa"
container_name = "pilot-dbw"
container_path = "test1-test1-arc"
abfs_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{container_path}"
# Spark Session setup
spark = SparkSession.builder.appName("AzureDataRead") \
.config("spark.jars", f"{path_to_hadoop_common_jar},{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_azure_datalake_jar}") \
.getOrCreate()
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
spark.conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.msi.endpoint.{account_name}.dfs.core.windows.net", "http://127.0.0.1:40342/metadata/identity/oauth2/token")
# Logging
spark.sparkContext.setLogLevel("DEBUG")
# Create a simple DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# Write the DataFrame to ABFS as Parquet
try:
df.write.parquet(abfs_path)
print(f"Parquet file successfully written to {abfs_path}")
except Exception as e:
print(f"Error writing Parquet file: {e}")
问题:是否可以通过编程方式覆盖 api-version 或在 Spark 配置中指定它?
是的,您是对的,从支持 arc 的服务器获取访问令牌与常规服务器不同。
它调用中间密钥的端点,并通过基本授权生成访问令牌。
首次向端点发出请求时,它应该处理异常并使用该元数据发出进一步请求来获取令牌,但在 hadoop azure jar 中,即使使用支持的 Api 版本,也不会处理引发错误。
因此,以下是可能的解决方法。
在java中创建一个自定义类,它具有与here给出的代码类似的功能,并在
spark.conf
中使用该类,就像here提到的自定义令牌提供程序一样
现在,您可以使用 pandas 来读取/写入文件,并使用提供默认凭据的存储选项。
首先,运行以下命令。
pip install fsspec,adlfs
代码:
import pandas as pd
strg_opt = {'account_name': "jadls", 'anon': False}
p_df = pd.read_csv("abfs://data/csv/bulk.csv" ,storage_options=strg_opt)
df = spark.createDataFrame(p_df)
df.show()
p_df[["Date","Symbol"]].to_csv("abfs://data/csv/new.csv",storage_options=strg_opt)
在这里,将
anon:False
传递给 storage_options
使用默认凭证(托管身份)并从中创建 Spark 数据帧。
接下来,路径应如下格式
abfs://{CONTAINER}/{FOLDER}/filename.csv
输出:
并在存储帐户中