在 pyspark 中更新插入/合并两个数据帧

问题描述 投票:0回答:1

我需要一个帮助来满足以下要求。这仅适用于示例数据。在实时用例中,每个数据帧中有超过 200 列。我需要比较两个数据帧并标记差异。

df1

id, name,  city
1,  abc,   pune
2,  xyz,   noida

df2

id,  name,  city
1,   abc,   pune
2,   xyz,   bangalore
3,   kk,    mumbai

预期数据框

id, name, city,      flag
1,  abc,  pune,      same
2,  xyz,  bangalore, update
3,  kk,   mumbai,    new

有人可以帮我在 pyspark 中构建逻辑吗?

提前致谢。

apache-spark pyspark apache-spark-sql
1个回答
2
投票

Pyspark 的哈希函数可以帮助识别不同的记录。

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.hash.html

from pyspark.sql.functions import col, hash

df1 = df1.withColumn('hash_value', hash('id', 'name', 'city') 
df2 = df2.withColumn('hash_value', hash('id', 'name', 'city') 

df_updates = df1 .alias('a').join(df2.alias('b'), (\
            (col('a.id') == col('b.id')) &\
            (col('a.hash_value') != col('b.hash_value')) \
            ) , how ='inner'
        )

df_updates = df_updates.select(b.*) 

一旦您确定了不同的记录。

然后您将能够设置一个函数,可以循环遍历 df 中的每一列来比较该列的值。

这样的东西应该有效



def add_change_flags(df1, df2):
   df_joined = df1.join(df2, 'id', how='inner')

   for column in df1.columns:
      df_joined = df_joined.withColumn(column + "_change_flag", \
            when(col(f"df1.{column}") === col(f"df2.{column}"),True)\
            .otherwise(False)) 

   return df_joined 

© www.soinside.com 2019 - 2024. All rights reserved.