在Azure Databricks笔记本中读取CSV文件,当CSV文件中的列顺序错误时如何读取数据?

问题描述 投票:0回答:1

我的任务是修改 Azure Databricks 中的 CSV 摄取。当前的实现使用以下设置:

source_query = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .schema(defined_schema)
    .option("enforceSchema", "false")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .option("skipRows", 0)
    .option("header", True)
)

result = (
    source_query.load(input_path)
    .withColumn("original_filename", input_file_name())
    .writeStream.format("delta")
    .option("checkpointLocation", checkpoint_path)
    .trigger(once=True)
    .toTable(table_name)
)

当 CSV 文件中的列顺序不正确时,我需要修改 CSV 摄取以考虑灵活性。

假设 CSV 文件具有以下列:

  • 客户 ID(整数)

  • 客户名称(字符串)

  • 订单号(整数)

增量表已使用以下模式定义:

  • customder_id(整数)

  • 客户名称(字符串)

  • 订单号(整数)

请注意,CSV 源文件和目标增量表中的列名称并不完全相同。上面代码中的“define_schema”参数是增量表的模式。

现在我假设我收到了格式错误的 CSV 文件,列顺序错误。例如,CSV 文件中的列顺序如下所示:

  • 客户 ID(整数)

  • 订单号(整数)

  • 客户名称(字符串)

这意味着“ORDER NUMBER”的数据将被摄取到增量表中的“customer_name”列中。

我正在寻找一种解决方案,以避免将数据写入增量表中的错误列。

一个想法是使用

saveAsTable
写入数据,因为它使用列名称来查找正确的列位置。但是,由于架构是在上面的代码中定义的,因此笔记本中 DataFrame 中的列名称并不完全是 CSV 文件中的列名称。换句话说,CSV 文件中有关列名的信息会丢失。此外,CSV 文件中的列名称与目标表不同。所以,这个解决方案不能使用。

有什么办法解决这个问题吗?我正在寻找一种使用 PySpark 处理正确文件和格式错误文件的解决方案。

如果我将

enforceSchema
设置为
True
,当CSV文件中的列顺序错误时,是否会导致摄取失败?

注: 请注意,有时增量表的数据类型与 CSV 源文件不同。我也希望有这个选项来定义模式。例如,我可能想定义增量表的模式如下(所有列都是字符串):

  • customder_id(字符串)

  • 客户名称(字符串)

  • 订单号(字符串)

csv save azure-databricks data-ingestion malformed
1个回答
0
投票

使用 StructType 和 StructField 类定义目标表的架构有帮助吗?

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spark = SparkSession.builder.appName("CSV to Delta").getOrCreate()

# Define the schema of the target table
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", DoubleType(), True)
])

# Read the CSV file with inconsistently ordered columns
df = spark.read.csv("path/to/csv/file.csv", header=True, schema=schema)

# Write the data to a Delta table
df.write.format("delta").mode("append").save("path/to/delta/table")

*已编辑

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.