问题陈述:
我们有一个 Databricks 作业,其中多个任务并行运行。每个任务都将 Spark 数据帧写入 Azure SQL 数据库表。每个任务都会写入自己的目标表。有些人写了数千条记录,但也有少数人写了数百万条记录。由于设计的本质,该过程是截断加载,使其成为增量过程也很棘手。
这个过程最初运行良好。但在添加一些较大的表负载后,我们偶尔会在写入后遇到 SQL Server 表中的重复行。我们分析了这些记录,发现这与其中一个数据帧分区被写入两次有关。我们不知道确切的根本原因,但假设它与 SQL Server -> Spark 集群之间的通信问题有关,其中 SQL Server 通知 Spark 在记录实际上已经写入并提交时重试该任务。可能是由于 Azure SQL 数据库端的限制,因为加载期间 DTU 使用率上限为 100%。
写入代码:
def write_to_sqldb_table(self, df: DataFrame, target_table_name: str, num_partitions: int = 1) -> None:
df.write.format("SQLSERVER").option("host", self.sql_host_name).option("database", self.sql_db_name).option(
"dbtable", target_table_name
).option("user", self.sql_user_id).option("password", self.sql_user_password).option("truncate", "true").option(
"batchsize", self.batch_size
).option("numPartitions", num_partitions).mode("overwrite").save()
到目前为止我们尝试过什么
在 SQL 表上定义唯一约束并在 PySpark 中处理错误
def write_to_sqldb_table_with_error_handling(self, df: DataFrame, target_table_name: str, num_partitions: int = 1) -> None:
try:
df.write.format("SQLSERVER").option("host", self.sql_host_name).option("database", self.sql_db_name).option(
"dbtable", target_table_name
).option("user", self.sql_user_id).option("password", self.sql_user_password).option("truncate", "true").option(
"batchsize", self.batch_size
).option("numPartitions", num_partitions).mode("overwrite").save()
except Exception as e:
error_message = str(e)
if "Violation of UNIQUE KEY constraint" in error_message:
return
else:
raise e
在 SQL 表上定义唯一约束并在表级别启用 IGNORE_DUP_KEY 选项
要求:
我们正在寻找一种强大的解决方案,避免重复插入,同时保持良好的性能。我目前正在考虑尝试以下 2 个解决方案,但正在寻找其他人提供的任何可能有帮助的意见或经验!
选项 1:使用适用于 Azure SQL 的 Apache Spark 连接器,可靠性级别 =“NO_DUPLICATES”
选项 2:在 SQL Server 端处理重复数据删除,方法是将 IDENTITY 列作为 SQL Server 表上的技术键,并从我的 Spark 作业中调用存储过程以在插入操作后删除重复项
DTU利用率达到100%,SQL限流会导致重试,从而引入这样的并发问题。
要减少 SQL 限制和命中 DTU 使用,您可以尝试以下选项:
您可以像下面这样实现SQL解决方案:
merge
方法。
MERGE
,用 sql 端的暂存数据替换并插入目标表中的现有记录。