Spark 读取有不良记录的 CSV

问题描述 投票:0回答:1

我正在尝试使用预定义的架构读取 Spark 中的 csv 文件。我用的:

df = (spark.read.format("csv")
        .schema(schema)
        .option("sep", ";")
        .load(
            file_path,
            header=True,
            encoding="utf-8"))

在这种情况下,数据加载没有任何问题。 现在,当我提供不良记录路径时,我没有得到任何记录:

df = (
        spark.read.format("csv")
        .schema(schema)
        .option("sep", ";")
        .option(
            "badRecordsPath",
            bad_records_path,
        )
        .load(
            file_path,
            header=True,
            encoding="utf-8",
        ))

所有记录都转储到错误记录路径中。

MALFORMED_CSV_RECORD (SQLSTATE: KD000)
即使使用的
schema
完全相同。为什么我会收到此错误?

apache-spark pyspark apache-spark-sql databricks azure-databricks
1个回答
0
投票

您可以使用

.option("mode", "DROPMALFORMED")
跳过坏行。

df = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("delimiter", ";") \
    .option("mode", "DROPMALFORMED") \
    .option("charset", "UTF-8") \
    .schema(schema) \
    .load(final_file_path)
df.display()

结果:

name    age job
John    30  Developer
Jane    25  Designer

解决方案代码会跳过 CSV 文件中分隔符数量不正确或与指定架构不匹配的行。 通过在初始数据读取和解析过程中过滤掉这些行,可以防止这些行在稍后的代码中引起错误。

© www.soinside.com 2019 - 2024. All rights reserved.