如何将数据插入到 Databricks 中更改架构的增量表中。
在 Databricks Scala 中,我正在分解 Map 列并将其加载到增量表中。我有一个预定义的增量表模式。
假设该架构有 4 列
A
、B
、C
、D
。
因此,有一天,我使用以下代码将包含 4 列的数据框加载到增量表中。
loadfinaldf.write.format("delta").option("mergeSchema", "true")\
.mode("append").insertInto("table")
数据框中的列每天都会变化。例如,在第 2 天,添加了两个新列
E
、F
,并且没有 C
列。现在我的数据框中有 5 列 A
、B
、D
、E
、F
。当我将此数据加载到增量表中时,应在表模式中动态创建列E
和F
,并且相应的数据应加载到这两列中,并且列C应填充为NULL。我假设 spark.conf.set("spark.databricks.delta.schema.autoMerge","true")
可以完成这项工作。但我无法实现这一目标。
我的做法: 我正在考虑列出预定义的增量模式和数据帧模式,并在将其加载到增量表之前对两者进行比较。
你能使用一些Python逻辑吗?
result = pd.concat([df1, df2], axis=1, join="inner")
然后,将数据帧推送到动态创建的 SQL 表中?
https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
如果您需要在执行插入之前更新架构(在某些情况下无法直接合并),那么至少在 SQL(DBR 15+、Unity Catalog)中您可以使用合并与架构演化,如下所示:
create or replace table catalog.schema.table_1 (col1 int, col2 int);
create or replace table catalog.schema.table_2 (col1 int, col2 int, col3 int);
merge with schema evolution into catalog.schema.table_1 as target
using (select * from catalog.schema.table_2 where 1 = 0) as source
-- ^^^^^^^^^^^
on target.col1 = source.col1
when not matched then insert *;
然后,如果您查看 table_1,它将具有 col3,即使没有插入行。