我有一个按小时分区的 Delta Lake 表。表架构包括:
colA
(字符串类型)colB
(int类型)colC
(结构类型)当我执行历史加载时,所有分区都已正确填充。然而,在我每小时将新数据写入该表的过程中,有些时候 colC 是未定义的。这会导致 colC 的推断数据类型与 Delta Lake 表中的现有架构不同。
此外,我不喜欢在原始文件的数据读取期间定义架构,因为架构会随着时间的推移而演变。
示例场景:
历史负载:
每小时工作:
问题:
对于第 3 小时,colC 未定义,导致推断的模式不匹配。
问题:如何处理这种 colC 有时未定义的情况,而不在数据读取期间定义固定模式?
示例代码:
inferred_schema = spark.read.json(df_raw.rdd.map(lambda x: x['Body'])).schema
df = df_raw.withColumn('BodyNew', from_json(col(), inferred_schema))
opts = {
"mergeSchema": "true",
"overwriteSchema": "false",
"partitionOverwriteMode":"dynamic"
}
df.write.partitionBy("hour")\
.mode("overwrite")\
.format("delta")\
.options(**opts)\
.save(path)
# above code works when I read data for a big date range
# but fails when I run it for few selective hours,
# because for those hours some columns are undefined
# which lead to inferred schema to be different.
您可以考虑读取现有的表模式并将其附加到输入数据帧。粗略的代码可能类似于下面的代码。
from pyspark.sql.functions import col, from_json
existing_table_name = "<add the output table name>"
inferred_schema = spark.read.table(existing_table_name).schema
df = df_raw.withColumn('BodyNew', from_json(col('Body'), inferred_schema))
opts = {
"mergeSchema": "true",
"overwriteSchema": "false",
"partitionOverwriteMode": "dynamic"
}
# Step 4: Write the DataFrame to a Delta table
df.write ...