将数据插入到具有更改架构的增量表中

问题描述 投票:0回答:2

如何将数据插入到 Databricks 中更改架构的增量表中。

在 Databricks Scala 中,我正在分解 Map 列并将其加载到增量表中。我有一个预定义的增量表模式。

假设该架构有 4 列

A
B
C
D

因此,有一天,我使用以下代码将包含 4 列的数据框加载到增量表中。

loadfinaldf.write.format("delta").option("mergeSchema", "true")\
       .mode("append").insertInto("table")

数据框中的列每天都会变化。例如,在第 2 天,添加了两个新列

E
F
,并且没有
C
列。现在我的数据框中有 5 列
A
B
D
E
F
。当我将此数据加载到增量表中时,应在表模式中动态创建列
E
F
,并且相应的数据应加载到这两列中,并且列C应填充为NULL。我假设
spark.conf.set("spark.databricks.delta.schema.autoMerge","true")
可以完成这项工作。但我无法实现这一目标。

我的做法: 我正在考虑列出预定义的增量模式和数据帧模式,并在将其加载到增量表之前对两者进行比较。

scala pyspark databricks
2个回答
0
投票

你能使用一些Python逻辑吗?

result = pd.concat([df1, df2], axis=1, join="inner")

然后,将数据帧推送到动态创建的 SQL 表中?

https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html


0
投票

如果您需要在执行插入之前更新架构(在某些情况下无法直接合并),那么至少在 SQL(DBR 15+、Unity Catalog)中您可以使用合并与架构演化,如下所示:

create or replace table catalog.schema.table_1 (col1 int, col2 int);

create or replace table catalog.schema.table_2 (col1 int, col2 int, col3 int);

merge with schema evolution into catalog.schema.table_1 as target
using (select * from catalog.schema.table_2 where 1 = 0) as source
--                                          ^^^^^^^^^^^   
on target.col1 = source.col1
when not matched then insert *;

然后,如果您查看 table_1,它将具有 col3,即使没有插入行。

© www.soinside.com 2019 - 2024. All rights reserved.