我正在开发一个 Spark Streaming 作业,该作业从源 Delta 表读取数据并使用 SCD2 逻辑将其写入目标 Delta 表。但是,我遇到了与“_rescued_data”列相关的问题。
将原始数据流式传输到第一个暂存 Delta 表时,Spark 会自动添加“_rescued_data”列来捕获与预期架构不匹配的数据。 在后续流中,尝试将数据写入最终的 Delta 表(使用 SCD2 实现)会导致错误:“列 _rescued_data 已存在。”
我尝试在批处理数据帧(batch_df)中将“_rescued_data”重命名为“_rescued_data_src”。然而,由于列名不匹配,这会导致batch_df和目标Delta表(sink_df)之间出现连接问题。
我正在寻找有关如何在 Delta 表中使用 SCD2 逻辑执行流式处理时处理“_rescued_data”列的指导。理想情况下,我想避免现有的列错误并确保为 SCD2 实现正确处理数据。
错误: [COLUMN_ALREADY_EXISTS] 列
_rescued_data
已存在。考虑选择其他名称或重命名现有列。 SQLSTATE:42711
按照本文所述对增量表进行合并 文档。
在这里,您可以设置列值或更新所有列。 以下是 pyspark 中的示例代码。
Targetdf.alias('target') \
.merge(
batchdf.alias('updates'),
'target.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"_rescued_data":"updates._rescued_data"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName"
}
) \
.execute()
或更新所有列值。
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
请参阅上面提供的文档,了解合并数据时的更多条件操作。