我正在使用“_corrupt_record”列根据文件架构解析 csv 文件中的错误记录。我现在有一个用例,我希望将新列添加到文件中,但是我可能无法控制此更改何时发生。因此,我希望在某个时间源文件中出现一个新列
col_B
,但目前可能不存在。
file_schema = StructType([
StructField("col_A", StringType(), True),
StructField("col_B", LongType(), True), # New column
StructField("_corrupt_record", StringType(), True)
])
df_src = (spark.read.format('csv')
.option("inferSchema",False)
.option("header",True)
.schema(file_schema)
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load(src_file_location)
.cache())
当源文件没有附加列时,有没有办法阻止所有行进入
_corrupt_record
列?
看看好文章:
你想要什么:
1/如果某些行没有所有列,则 readCSV 出错?
将选项“模式”设置为打开
2/损坏的_记录列上的错误行: 当我执行你的代码时,它适用于
输入:
id,col_a,col_int,col_b
A,col_A_A,1,new_col_a
B,col_A_B,col_int,new_col_b
C,col_A_C,-55
输出:
id | col_A | col_int | col_B | _损坏_记录 |
---|---|---|---|---|
A | col_A_A | 1 | new_col_a | 空 |
空 | 空 | 空 | 空 | B,col_A_B,col_int... |
C | col_A_C | -55 | 空 | C,col_A_C,-55 |
代码(在java中,但与python/scala非常相似)
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("col_A", DataTypes.StringType, true),
DataTypes.createStructField("col_int", DataTypes.IntegerType, true),
DataTypes.createStructField("col_B", DataTypes.StringType, true),
DataTypes.createStructField("_corrupt_record", DataTypes.StringType, true)
});
Dataset<Row> ds = ss.read()
.option("header", "true")
// .option("mode","DROPMALFORMED") //if you need to
.option("columnNameOfCorruptRecord", "_corrupt_record")
.schema(schema)
.csv("file.csv");