我想迭代比较 PySpark 数据框中的 2 组行,并找到另一列中的共同值。例如,我有下面的数据框 (
df
)。
Column1 Column2
abc 111
def 666
def 111
tyu 777
abc 777
def 222
tyu 333
ewq 888
我想要的输出是
abc,def,CommonRow <-- because of 111
abc,ewq,NoCommonRow
abc,tyu,CommonRow <-- because of 777
def,ewq,NoCommonRow
def,tyu,NoCommonRow
ewq,tyu,NoCommonRow
我当前用于执行此操作的 PySpark 代码是
# "value_list" contains the unique list of values in Column 1
index = 0
for col1 in value_list:
index += 1
df_col1 = df.filter(df.Column1 == col1)
for col2 in value_list[index:]:
df_col2 = df.filter(df.Column1 == col2)
df_join = df_col1.join(df_col2, on=(df_col1.Column2 == df_col2.Column2), how="inner")
if df_join.limit(1).count() == 0: # No common row
print(col1,col2,"NoCommonRow")
else:
print(col1,col2,"CommonRow")
但是,我发现这需要很长时间才能运行(
df
有数百万行)。有没有办法优化它以使其运行得更快,或者有更好的方法来进行比较?
自连接数据框以创建行对
df1 = df.alias('l').join(df.alias('r'), on=F.expr("l.column1 != r.column1"))
# df1.show()
# +-------+-------+-------+-------+
# |Column1|Column2|Column1|Column2|
# +-------+-------+-------+-------+
# | abc| 111| def| 666|
# | abc| 111| def| 111|
# | abc| 111| tyu| 777|
# | abc| 111| def| 222|
# | abc| 111| tyu| 333|
# | abc| 111| ewq| 888|
# | def| 666| abc| 111|
# | def| 666| tyu| 777|
# | def| 666| abc| 777|
# | def| 666| tyu| 333|
# | def| 666| ewq| 888|
# | def| 111| abc| 111|
# | def| 111| tyu| 777|
# | def| 111| abc| 777|
# | def| 111| tyu| 333|
# | def| 111| ewq| 888|
# | tyu| 777| abc| 111|
# | tyu| 777| def| 666|
# | tyu| 777| def| 111|
# | tyu| 777| abc| 777|
# +-------+-------+-------+-------+
添加一个布尔值
flag
来检查左右数据帧中的column2值是否匹配。同样,添加一个 pair
列,它是左右数据帧中的第 1 列值的组合
df1 = df1.withColumn('flag', F.expr('l.column2 = r.column2'))
df1 = df1.withColumn('pair', F.array_sort(F.array('l.column1', 'r.column1')))
# df1.show()
# +-------+-------+-------+-------+-----+----------+
# |Column1|Column2|Column1|Column2| flag| pair|
# +-------+-------+-------+-------+-----+----------+
# | abc| 111| def| 666|false|[abc, def]|
# | abc| 111| def| 111| true|[abc, def]|
# | abc| 111| tyu| 777|false|[abc, tyu]|
# | abc| 111| def| 222|false|[abc, def]|
# | abc| 111| tyu| 333|false|[abc, tyu]|
# | abc| 111| ewq| 888|false|[abc, ewq]|
# | def| 666| abc| 111|false|[abc, def]|
# | def| 666| tyu| 777|false|[def, tyu]|
# | def| 666| abc| 777|false|[abc, def]|
# | def| 666| tyu| 333|false|[def, tyu]|
# | def| 666| ewq| 888|false|[def, ewq]|
# | def| 111| abc| 111| true|[abc, def]|
# | def| 111| tyu| 777|false|[def, tyu]|
# | def| 111| abc| 777|false|[abc, def]|
# | def| 111| tyu| 333|false|[def, tyu]|
# | def| 111| ewq| 888|false|[def, ewq]|
# | tyu| 777| abc| 111|false|[abc, tyu]|
# | tyu| 777| def| 666|false|[def, tyu]|
# | tyu| 777| def| 111|false|[def, tyu]|
# | tyu| 777| abc| 777| true|[abc, tyu]|
# +-------+-------+-------+-------+-----+----------+
按
pair
对数据帧进行分组,并与 max
进行聚合,以检查给定 pair
组中是否有任何行将标志设置为 True
df1 = df1.groupBy('pair').agg(F.max('flag').alias('has_common_row'))
# df1.show()
# +----------+--------------+
# | pair|has_common_row|
# +----------+--------------+
# |[abc, def]| true|
# |[abc, tyu]| true|
# |[abc, ewq]| false|
# |[def, tyu]| false|
# |[def, ewq]| false|
# |[ewq, tyu]| false|
# +----------+--------------+