我正在尝试使用 databricks 加载 XML 文件。 我的环境在azure databricks上: 14.3 LTS(包括 Apache Spark 3.5.0、Scala 2.12)
这是我失败的代码:
# Load the specified XML file
single_file_df = (
spark.read.format("com.databricks.spark.xml")
.option("rowTag", "Tag").load(specific_file_path)
)
# # Show a sample of the data
single_file_df.show(truncate=False)
错误是:
Py4JJavaError: An error occurred while calling o457.load.
: Failure to initialize configuration for storage account [REDACTED].dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key
File <command-1483821906694786>,
line 4
1 # Load the specified XML file
2 single_file_df = (
3 spark.read.format("com.databricks.spark.xml")
----> 4 .option("rowTag", "Tag").load(specific_file_path)
5 )
7 # # Show a sample of the data
8 single_file_df.show(truncate=False)
到目前为止我检查过的内容:
那么我的选择是什么?
似乎与库有关:com.databricks:spark-xml_2.12:0.15.0 但我不明白这个错误。 我不想安装或使用 blob 库(sdk 连接),我想尽可能少地使用 python,并坚持使用 pyspark。
您的帮助将不胜感激
这是我的完整代码:
# Define the Azure Key Vault scope
scope = 'your_scope_here'
# Retrieve storage account names and access keys securely from Key Vault
dl_storage_account = dbutils.secrets.get(scope=scope, key="dl_storage_account_name_key")
dl_storage_account_access_key = dbutils.secrets.get(scope=scope, key="dl_storage_account_access_key_key")
blob_storage_account = dbutils.secrets.get(scope=scope, key="blob_storage_account_name_key")
blob_storage_account_access_key = dbutils.secrets.get(scope=scope, key="blob_storage_account_access_key_key")
# Set Spark configurations for accessing Azure storage
spark.conf.set(f"fs.azure.account.key.{blob_storage_account}.blob.core.windows.net", blob_storage_account_access_key)
spark.conf.set(f"fs.azure.account.key.{dl_storage_account}.dfs.core.windows.net", dl_storage_account_access_key)
# Define the input path for Azure Data Lake (redacted)
input_path = f"abfss://container-name@{dl_storage_account}.dfs.core.windows.net/path/to/directory/"
from pyspark.sql import SparkSession
from pyspark.sql.functions import schema_of_xml, expr, col, current_timestamp,lit,explode_outer, input_file_name, regexp_extract,concat_ws
from pyspark.sql.types import NullType
# Define the specific file path for Azure Data Lake (redacted)
specific_file_path = f"abfss://container-name@{dl_storage_account}.dfs.core.windows.net/path/to/directory/file-name.xml"
# Load the specified XML file
single_file_df = (
spark.read.format("com.databricks.spark.xml")
.option("rowTag", "Tag").load(specific_file_path)
)
# # Show a sample of the data
single_file_df.show(truncate=False)
我尝试过以下方法:
configs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<YOUR CLIENT ID>",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="Key-vault-secret-dbx02", key="secretKV2"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<YOUR TENANT ID>/oauth2/token"
}
try:
dbutils.fs.mount( source="abfss://[email protected]",
mount_point="/mnt/raw_agent02",
extra_configs=configs
)
print("Mount successful!")
except Exception as e:
print(f"Error mounting storage: {e}")
在上面的代码中,我已经使用 Azure databricks 安装了 ADLS。 了解更多关于如何在 Azure datbricks 上安装 ADLS
还提供AZUREDATBRICKS应用程序
Keyvault adminstartor
和Storage Blob Data Contributor
角色
一旦挂载成功。 转到您的集群>库>Maven Central
使用以下库并安装在集群上。
com.databricks:spark-xml_2.12:0.18.0
然后尝试从 ADLS 读取 Xml 文件:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
schema = StructType([
StructField("ID", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("Department", StringType(), True),
StructField("Designation", StringType(), True),
StructField("Salary", IntegerType(), True),
StructField("JoiningDate", StringType(), True)
])
file_path = "/mnt/raw_agent02/new/sampledilip.xml"
try:
df = spark.read.format("com.databricks.spark.xml") \
.option("rowTag", "Employee") \
.schema(schema) \
.load(file_path)
print("File loaded successfully!")
df = df.withColumn("JoiningDate", df["JoiningDate"].cast(DateType()))
df.printSchema()
df.show()
except Exception as e:
print(f"Error reading XML file: {e}")
在上面的代码中定义 XML 文件的架构 提供已安装容器中 XML 文件的路径。 将 XML 文件读入 DataFrame。
结果:
File loaded successfully!
root
|-- ID: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Department: string (nullable = true)
|-- Designation: string (nullable = true)
|-- Salary: integer (nullable = true)
|-- JoiningDate: date (nullable = true)
+---+-------------+----------+-----------+------+-----------+
| ID| Name|Department|Designation|Salary|JoiningDate|
+---+-------------+----------+-----------+------+-----------+
|101| John Doe| HR| Manager| 75000| 2018-05-15|
|102| Jane Smith| Finance| Analyst| 68000| 2019-07-20|
|103|Emily Johnson| IT| Developer| 80000| 2020-03-10|
|104|Michael Brown| Marketing| Executive| 55000| 2021-08-01|
+---+-------------+----------+-----------+------+-----------+