我正在尝试使用预定义的架构读取 Spark 中的 csv 文件。我用的:
df = (spark.read.format("csv")
.schema(schema)
.option("sep", ";")
.load(
file_path,
header=True,
encoding="utf-8"))
在这种情况下,数据加载没有任何问题。 现在,当我提供不良记录路径时,我没有得到任何记录:
df = (
spark.read.format("csv")
.schema(schema)
.option("sep", ";")
.option(
"badRecordsPath",
bad_records_path,
)
.load(
file_path,
header=True,
encoding="utf-8",
))
所有记录都转储到错误记录路径中。
MALFORMED_CSV_RECORD (SQLSTATE: KD000)
即使使用的 schema
完全相同。为什么我会收到此错误?
您可以使用
.option("mode", "DROPMALFORMED")
跳过坏行。
df = sqlContext.read \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("delimiter", ";") \
.option("mode", "DROPMALFORMED") \
.option("charset", "UTF-8") \
.schema(schema) \
.load(final_file_path)
df.display()
结果:
name age job
John 30 Developer
Jane 25 Designer
解决方案代码会跳过 CSV 文件中分隔符数量不正确或与指定架构不匹配的行。 通过在初始数据读取和解析过程中过滤掉这些行,可以防止这些行在稍后的代码中引起错误。