Delta Lake 表写入无需架构强制执行

问题描述 投票:0回答:1

我有一个按小时分区的 Delta Lake 表。表架构包括:

  • colA
    (字符串类型)
  • colB
    (int类型)
  • colC
    (结构类型)

当我执行历史加载时,所有分区都已正确填充。然而,在我每小时将新数据写入该表的过程中,有些时候 colC 是未定义的。这会导致 colC 的推断数据类型与 Delta Lake 表中的现有架构不同。

此外,我不喜欢在原始文件的数据读取期间定义架构,因为架构会随着时间的推移而演变。

示例场景:

历史负载:

  • 第 1 小时:colA =“value1”,colB = 100,colC = {“field1”:“data1”,“field2”:10}
  • 第 2 小时:colA =“value2”,colB = 200,colC = {“field1”:“data2”,“field2”:20}

每小时工作:

  • 第 3 小时:colA =“value3”,colB = 300,colC = 未定义
  • 第 4 小时:colA =“value4”,colB = 400,colC = {“field1”:“data4”,“field2”:40}

问题:

对于第 3 小时,colC 未定义,导致推断的模式不匹配。

问题:如何处理这种 colC 有时未定义的情况,而不在数据读取期间定义固定模式?

示例代码:

inferred_schema = spark.read.json(df_raw.rdd.map(lambda x: x['Body'])).schema
df = df_raw.withColumn('BodyNew', from_json(col(), inferred_schema))
opts = {
    "mergeSchema": "true",
    "overwriteSchema": "false",
    "partitionOverwriteMode":"dynamic"
    }
df.write.partitionBy("hour")\
    .mode("overwrite")\
    .format("delta")\
    .options(**opts)\
    .save(path)
# above code works when I read data for a big date range
# but fails when I run it for few selective hours, 
# because for those hours some columns are undefined
# which lead to inferred schema to be different.
apache-spark databricks bigdata delta-lake
1个回答
0
投票

您可以考虑读取现有的表模式并将其附加到输入数据帧。粗略的代码可能类似于下面的代码。

from pyspark.sql.functions import col, from_json

existing_table_name = "<add the output table name>"
inferred_schema = spark.read.table(existing_table_name).schema

df = df_raw.withColumn('BodyNew', from_json(col('Body'), inferred_schema))

opts = {
    "mergeSchema": "true",
    "overwriteSchema": "false",
    "partitionOverwriteMode": "dynamic"
}

# Step 4: Write the DataFrame to a Delta table
df.write ...
© www.soinside.com 2019 - 2024. All rights reserved.