我有一个包含10TB数据的mysql数据库,我已经通过flink-cdc使用iceberg表格式将所有数据流式传输到s3。
我想检查是否有数据丢失,或者是否有不匹配的数据值。
目前,我有 2 个使用 pyspark 的解决方案。
# initialize two table in spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, md5
spark = SparkSession.builder.getOrCreate()
table_name = 'target_table'
df_iceberg_table = read_iceberg_table_using_spark(table_name)
df_mysql_table = read_mysql_table_using_spark(table_name)
table_columns = get_table_columns(table_name)
首先是计算每一行的哈希值并比较它们以检查数据是否相同。
df_mysql_table_hash = (
df_mysql_table
.select(
col('id'),
md5(concat_ws('|', *table_columns)).alias('hash')
)
)
df_iceberg_table_hash = (
df_iceberg_table
.select(
col('id'),
md5(concat_ws('|', *table_columns)).alias('hash')
)
)
df_mysql_table_hash.createOrReplaceTempView('mysql_table_hash')
df_iceberg_table_hash.createOrReplaceTempView('iceberg_table_hash')
df_diff = spark.sql('''
select
d1.id as mysql_id,
d2.id as iceberg_id,
d1.hash as mysql_hash,
d2.hash as iceberg_hash
from mysql_table_hash d1
left outer join iceberg_table_hash d2 on d1.id = d2.id
where false
or d2.id is null
or d1.hash <> d2.hash
''')
# save df_diff to some where
第二种是使用pyspark中的subtract函数
df_diff = df_mysql_table.subtract(df_iceberg_table)
# save df_diff to some where
哪一个更好更快? 有没有更好的方法来实现这个目标?
与
subtract()
类似,您也可以使用exceptAll()
方法来查找两个DataFrame之间的差异。对我来说,它在单元测试中特别有用。如果 exceptAll()
的结果是一个空的 DataFrame,则意味着两个 DataFrame 是相同的。否则,exceptAll()
返回的非空 DataFrame 包含两个 DataFrame 之间不同的行。
# Find the differences between df1 and df2
diff = df1.exceptAll(df2)
# Check if there are any differences
if diff.count() == 0:
print("DataFrames are identical.")
else:
print("DataFrames have differences.")
diff.show()
另请注意,
exceptAll()
同时根据列值和行顺序比较DataFrame,而subtract()
仅根据列值比较DataFrame,忽略行顺序。如果性能是关键因素并且行顺序并不重要,则使用 subtract()
可能比 exceptAll()
更有效,但使用 subtract()
,您无法检测重复的行(如果有)。