我们需要从故障点重新启动从 databricks 工作流运行的 Azure databricks 笔记本。我已经了解了 databricks 中的修复和重新运行概念,并做了一个小 poc 从故障点重新启动任务,效果很好
但是,在实际场景中,我正在运行 pyspark 笔记本,它执行 DML 操作,例如使用 sqark sql 进行更新和插入。如果其中任何一个失败并且我们进行修复运行,它将从头开始运行笔记本。可能有以下情况在这种情况下重复插入。有什么办法可以避免它。我正在考虑使用合并来根据主键进行插入和更新。这会解决使用修复和重新运行从故障点重新启动的问题吗?
我已经了解了databricks中的修复和重新运行概念,并做了一个小poc从故障点重新启动任务,并且工作正常
您可以使用以下代码进行合并操作。
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
有关合并的更多信息,请参阅此文档。
最初,您在失败时开始进行更新插入,进行修复并重新运行,这仅更新插入数据。