使用架构读取 CSV 文件时忽略缺失列

问题描述 投票:0回答:1

我正在使用“_corrupt_record”列根据文件架构解析 csv 文件中的错误记录。我现在有一个用例,我希望将新列添加到文件中,但是我可能无法控制此更改何时发生。因此,我希望在某个时间源文件中出现一个新列

col_B
,但目前可能不存在。

file_schema = StructType([
     StructField("col_A", StringType(), True),
     StructField("col_B", LongType(), True), # New column
     StructField("_corrupt_record", StringType(), True)
])
df_src = (spark.read.format('csv')
                    .option("inferSchema",False)
                    .option("header",True)
                    .schema(file_schema)
                    .option("columnNameOfCorruptRecord", "_corrupt_record")
                    .load(src_file_location)
                    .cache())

当源文件没有附加列时,有没有办法阻止所有行进入

_corrupt_record
列?

apache-spark pyspark pyspark-schema
1个回答
0
投票

看看好文章:

https://medium.com/@patidar.rahul8392/dealing-with-bad-or-corrupt-records-in-apache-spark-bab64b4d24b3

你想要什么:

1/如果某些行没有所有列,则 readCSV 出错?

将选项“模式”设置为打开

  • DROPMALFORMED:忽略整个损坏的记录。这种模式是 CSV 内置函数不支持。
  • FAILFAST:遇到损坏的记录时抛出异常。

2/损坏的_记录列上的错误行: 当我执行你的代码时,它适用于

输入:

id,col_a,col_int,col_b
A,col_A_A,1,new_col_a
B,col_A_B,col_int,new_col_b
C,col_A_C,-55

输出:

id col_A col_int col_B _损坏_记录
A col_A_A 1 new_col_a
B,col_A_B,col_int...
C col_A_C -55 C,col_A_C,-55

代码(在java中,但与python/scala非常相似)

        StructType schema = DataTypes.createStructType(new StructField[] {
                DataTypes.createStructField("id",  DataTypes.StringType, true),
                DataTypes.createStructField("col_A",  DataTypes.StringType, true),
                DataTypes.createStructField("col_int",  DataTypes.IntegerType, true),
                DataTypes.createStructField("col_B", DataTypes.StringType, true),
                DataTypes.createStructField("_corrupt_record", DataTypes.StringType, true)
        });

        Dataset<Row> ds = ss.read()
                .option("header", "true")
//                .option("mode","DROPMALFORMED") //if you need to
                .option("columnNameOfCorruptRecord", "_corrupt_record")
                .schema(schema)
                .csv("file.csv");
© www.soinside.com 2019 - 2024. All rights reserved.