使用 Pyspark 合并 DeltaTable 中的多个条件

问题描述 投票:0回答:2

我使用 Delta Table 构建了一个流程,通过

ID_CLIENT
ID_PRODUCT
键更新插入我的数据,但我收到错误:

合并多个源行匹配

是否可以进行多个条件的合并?

tabela_spec.alias("current") \
.merge(dfrn.alias("update"), "current.id_client = update.id_client AND current.id_product = update.id_product") \
.whenMatchedUpdateAll().whenNotMatchedInsertAll() \
.execute()
python apache-spark pyspark delta-lake
2个回答
3
投票

是的,应该可以。

替代格式:

tabela_spec.alias("current") \
  .merge(
    dfrn.alias("update"),
    "current.id_client = update.id_client" + \
    " AND current.id_product = update.id_product"
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

0
投票

之前提出的答案有效吗?我遇到了类似的问题,并且似乎只适用于一种条件,而不是两种条件:

导入增量 Factevents = delta.DeltaTable.forPath(spark, 'abfss://********************.dfs.core.windows.net/silver/C365/events/' )

合并新数据

( Factevents.alias('事件') 。合并( dfNewData.alias('新'), “events.uprn = New.uprn AND events.id = New.id” ) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() 。执行() )

© www.soinside.com 2019 - 2024. All rights reserved.