我的任务是修改 Azure Databricks 中的 CSV 摄取。当前的实现使用以下设置:
source_query = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.schema(defined_schema)
.option("enforceSchema", "false")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("skipRows", 0)
.option("header", True)
)
result = (
source_query.load(input_path)
.withColumn("original_filename", input_file_name())
.writeStream.format("delta")
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.toTable(table_name)
)
当 CSV 文件中的列顺序不正确时,我需要修改 CSV 摄取以考虑灵活性。
假设 CSV 文件具有以下列:
客户 ID(整数)
客户名称(字符串)
订单号(整数)
增量表已使用以下模式定义:
customder_id(整数)
客户名称(字符串)
订单号(整数)
请注意,CSV 源文件和目标增量表中的列名称并不完全相同。上面代码中的“define_schema”参数是增量表的模式。
现在我假设我收到了格式错误的 CSV 文件,列顺序错误。例如,CSV 文件中的列顺序如下所示:
客户 ID(整数)
订单号(整数)
客户名称(字符串)
这意味着“ORDER NUMBER”的数据将被摄取到增量表中的“customer_name”列中。
我正在寻找一种解决方案,以避免将数据写入增量表中的错误列。
一个想法是使用
saveAsTable
写入数据,因为它使用列名称来查找正确的列位置。但是,由于架构是在上面的代码中定义的,因此笔记本中 DataFrame 中的列名称并不完全是 CSV 文件中的列名称。换句话说,CSV 文件中有关列名的信息会丢失。此外,CSV 文件中的列名称与目标表不同。所以,这个解决方案不能使用。
有什么办法解决这个问题吗?我正在寻找一种使用 PySpark 处理正确文件和格式错误文件的解决方案。
如果我将
enforceSchema
设置为True
,当CSV文件中的列顺序错误时,是否会导致摄取失败?
注: 请注意,有时增量表的数据类型与 CSV 源文件不同。我也希望有这个选项来定义模式。例如,我可能想定义增量表的模式如下(所有列都是字符串):
customder_id(字符串)
客户名称(字符串)
订单号(字符串)
使用 StructType 和 StructField 类定义目标表的架构有帮助吗?
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
spark = SparkSession.builder.appName("CSV to Delta").getOrCreate()
# Define the schema of the target table
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("salary", DoubleType(), True)
])
# Read the CSV file with inconsistently ordered columns
df = spark.read.csv("path/to/csv/file.csv", header=True, schema=schema)
# Write the data to a Delta table
df.write.format("delta").mode("append").save("path/to/delta/table")
*已编辑