在 DELTA 到 DELTA 流式传输期间,列 `_rescued_data` 已经存在

问题描述 投票:0回答:1

我正在开发一个 Spark Streaming 作业,该作业从源 Delta 表读取数据并使用 SCD2 逻辑将其写入目标 Delta 表。但是,我遇到了与“_rescued_data”列相关的问题。

将原始数据流式传输到第一个暂存 Delta 表时,Spark 会自动添加“_rescued_data”列来捕获与预期架构不匹配的数据。 在后续流中,尝试将数据写入最终的 Delta 表(使用 SCD2 实现)会导致错误:“列 _rescued_data 已存在。”

我尝试在批处理数据帧(batch_df)中将“_rescued_data”重命名为“_rescued_data_src”。然而,由于列名不匹配,这会导致batch_df和目标Delta表(sink_df)之间出现连接问题。

我正在寻找有关如何在 Delta 表中使用 SCD2 逻辑执行流式处理时处理“_rescued_data”列的指导。理想情况下,我想避免现有的列错误并确保为 SCD2 实现正确处理数据。

错误: [COLUMN_ALREADY_EXISTS] 列

_rescued_data
已存在。考虑选择其他名称或重命名现有列。 SQLSTATE:42711

azure pyspark databricks spark-streaming
1个回答
0
投票

按照本文所述对增量表进行合并 文档

在这里,您可以设置列值或更新所有列。 以下是 pyspark 中的示例代码。


Targetdf.alias('target') \
  .merge(
    batchdf.alias('updates'),
    'target.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "_rescued_data":"updates._rescued_data"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName"
    }
  ) \
  .execute()

或更新所有列值。

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

请参阅上面提供的文档,了解合并数据时的更多条件操作。

© www.soinside.com 2019 - 2024. All rights reserved.