使用 Databricks 将 Spark 数据帧写入 Azure SQL Server 数据库时处理重复记录

问题描述 投票:0回答:1

问题陈述:

我们有一个 Databricks 作业,其中多个任务并行运行。每个任务都将 Spark 数据帧写入 Azure SQL 数据库表。每个任务都会写入自己的目标表。有些人写了数千条记录,但也有少数人写了数百万条记录。由于设计的本质,该过程是截断加载,使其成为增量过程也很棘手。

这个过程最初运行良好。但在添加一些较大的表负载后,我们偶尔会在写入后遇到 SQL Server 表中的重复行。我们分析了这些记录,发现这与其中一个数据帧分区被写入两次有关。我们不知道确切的根本原因,但假设它与 SQL Server -> Spark 集群之间的通信问题有关,其中 SQL Server 通知 Spark 在记录实际上已经写入并提交时重试该任务。可能是由于 Azure SQL 数据库端的限制,因为加载期间 DTU 使用率上限为 100%。

写入代码:

def write_to_sqldb_table(self, df: DataFrame, target_table_name: str, num_partitions: int = 1) -> None:

        df.write.format("SQLSERVER").option("host", self.sql_host_name).option("database", self.sql_db_name).option(
            "dbtable", target_table_name
        ).option("user", self.sql_user_id).option("password", self.sql_user_password).option("truncate", "true").option(
            "batchsize", self.batch_size
        ).option("numPartitions", num_partitions).mode("overwrite").save()
  • batch_size 设置为 10.000,过去我们对此进行了性能测试,这似乎表现最好。
  • num_partitions 是相对于数据帧的记录计算的:total_records/batch_size,然后四舍五入到最接近的 4 的倍数,因为我们的集群有 4 个核心的倍数

到目前为止我们尝试过什么

在 SQL 表上定义唯一约束并在 PySpark 中处理错误

  • 独特的约束做了他们必须做的事情:使 Spark 任务在插入重复记录时失败
  • 尝试通过检查文本“唯一约束违规”然后从函数返回来处理 Spark 中的错误,但无法使其正常工作。 Spark 作业不断失败,而不是捕获错误并继续执行剩余的插入,直到作业成功完成。
def write_to_sqldb_table_with_error_handling(self, df: DataFrame, target_table_name: str, num_partitions: int = 1) -> None:

        try:
            df.write.format("SQLSERVER").option("host", self.sql_host_name).option("database", self.sql_db_name).option(
                "dbtable", target_table_name
            ).option("user", self.sql_user_id).option("password", self.sql_user_password).option("truncate", "true").option(
                "batchsize", self.batch_size
            ).option("numPartitions", num_partitions).mode("overwrite").save()

        except Exception as e:
            error_message = str(e)

            if "Violation of UNIQUE KEY constraint" in error_message:
                return
            else:
                raise e

在 SQL 表上定义唯一约束并在表级别启用 IGNORE_DUP_KEY 选项

  • 实施 IGNORE_DUP_KEY 后,较大表的插入操作陷入僵局。
  • 尝试减少batch_size,但这没有帮助。

要求:

我们正在寻找一种强大的解决方案,避免重复插入,同时保持良好的性能。我目前正在考虑尝试以下 2 个解决方案,但正在寻找其他人提供的任何可能有帮助的意见或经验!

  • 选项 1:使用适用于 Azure SQL 的 Apache Spark 连接器,可靠性级别 =“NO_DUPLICATES”

    • 此连接器经常出现在与编写 Spark 数据框 -> Azure SQL 数据库时提高性能相关的讨论中
    • 最新版本仅支持 Spark 3.4,而我们使用的是 Spark 3.5,但我认为值得一试
  • 选项 2:在 SQL Server 端处理重复数据删除,方法是将 IDENTITY 列作为 SQL Server 表上的技术键,并从我的 Spark 作业中调用存储过程以在插入操作后删除重复项

pyspark jdbc duplicates azure-sql-database azure-databricks
1个回答
0
投票

DTU利用率达到100%,SQL限流会导致重试,从而引入这样的并发问题。

要减少 SQL 限制和命中 DTU 使用,您可以尝试以下选项:

  • 减少分区数量或减少写入 Azure SQL 数据库的并行任务数量可以减少 SQL 限制。
  • 需要减小批量大小以管理较大表上的较高负载。首先将其减少一半并监控 DTU 使用情况。

您可以像下面这样实现SQL解决方案:

  • 使用
    merge
    方法。
    1. 从 Spark 将数据加载到 SQL 中的临时表中。
    2. 执行 SQL
      MERGE
      ,用 sql 端的暂存数据替换并插入目标表中的现有记录。
  • 这种方法避免了截断并降低了因重试而导致不一致的风险。
© www.soinside.com 2019 - 2024. All rights reserved.