重新启动 Databricks 工作流程中失败的任务

问题描述 投票:0回答:1

我们需要从故障点重新启动从 databricks 工作流运行的 Azure databricks 笔记本。我已经了解了 databricks 中的修复和重新运行概念,并做了一个小 poc 从故障点重新启动任务,效果很好

https://www.databricks.com/blog/2022/05/06/save-time-and-money-on-data-and-ml-workflows-with-repair-and-rerun.html

但是,在实际场景中,我正在运行 pyspark 笔记本,它执行 DML 操作,例如使用 sqark sql 进行更新和插入。如果其中任何一个失败并且我们进行修复运行,它将从头开始运行笔记本。可能有以下情况在这种情况下重复插入。有什么办法可以避免它。我正在考虑使用合并来根据主键进行插入和更新。这会解决使用修复和重新运行从故障点重新启动的问题吗?

我已经了解了databricks中的修复和重新运行概念,并做了一个小poc从故障点重新启动任务,并且工作正常

https://www.databricks.com/blog/2022/05/06/save-time-and-money-on-data-and-ml-workflows-with-repair-and-rerun.html

databricks azure-databricks restart
1个回答
0
投票

您可以使用以下代码进行合并操作。

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

有关合并的更多信息,请参阅此文档

最初,您在失败时开始进行更新插入,进行修复并重新运行,这仅更新插入数据。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.