flink-cdc 将我的所有数据从数据库流式传输到数据湖后,是否有更好的方法来检查数据丢失和不匹配的数据?

问题描述 投票:0回答:1

我有一个包含10TB数据的mysql数据库,我已经通过flink-cdc使用iceberg表格式将所有数据流式传输到s3。

我想检查是否有数据丢失,或者是否有不匹配的数据值。

目前,我有 2 个使用 pyspark 的解决方案。

# initialize two table in spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, md5

spark = SparkSession.builder.getOrCreate()

table_name = 'target_table'
df_iceberg_table = read_iceberg_table_using_spark(table_name)
df_mysql_table = read_mysql_table_using_spark(table_name)
table_columns = get_table_columns(table_name)

首先是计算每一行的哈希值并比较它们以检查数据是否相同。

df_mysql_table_hash = (
    df_mysql_table
        .select(
            col('id'),
            md5(concat_ws('|', *table_columns)).alias('hash')
        )
)

df_iceberg_table_hash = (
    df_iceberg_table
        .select(
            col('id'),
            md5(concat_ws('|', *table_columns)).alias('hash')
        )
)

df_mysql_table_hash.createOrReplaceTempView('mysql_table_hash')
df_iceberg_table_hash.createOrReplaceTempView('iceberg_table_hash')

df_diff = spark.sql('''
    select 
        d1.id as mysql_id, 
        d2.id as iceberg_id, 
        d1.hash as mysql_hash, 
        d2.hash as iceberg_hash
    from mysql_table_hash d1
    left outer join iceberg_table_hash d2 on d1.id = d2.id
    where false
        or d2.id is null
        or d1.hash <> d2.hash
''')

# save df_diff to some where

第二种是使用pyspark中的subtract函数

df_diff = df_mysql_table.subtract(df_iceberg_table)

# save df_diff to some where

哪一个更好更快? 有没有更好的方法来实现这个目标?

python apache-spark bigdata iceberg
1个回答
0
投票

subtract()
类似,您也可以使用
exceptAll()
方法来查找两个DataFrame之间的差异。对我来说,它在单元测试中特别有用。如果
exceptAll()
的结果是一个空的 DataFrame,则意味着两个 DataFrame 是相同的。否则,
exceptAll()
返回的非空 DataFrame 包含两个 DataFrame 之间不同的行。

# Find the differences between df1 and df2
diff = df1.exceptAll(df2)

# Check if there are any differences
if diff.count() == 0:
    print("DataFrames are identical.")
else:
    print("DataFrames have differences.")
    diff.show()

另请注意,

exceptAll()
同时根据列值和行顺序比较DataFrame,而
subtract()
仅根据列值比较DataFrame,忽略行顺序。如果性能是关键因素并且行顺序并不重要,则使用
subtract()
可能比
exceptAll()
更有效,但使用
subtract()
,您无法检测重复的行(如果有)。

© www.soinside.com 2019 - 2024. All rights reserved.