优化 PySpark 代码以进行行比较

问题描述 投票:0回答:1

我想迭代比较 PySpark 数据框中的 2 组行,并找到另一列中的共同值。例如,我有下面的数据框 (

df
)。

Column1  Column2
abc      111
def      666
def      111
tyu      777
abc      777
def      222
tyu      333
ewq      888

我想要的输出是

abc,def,CommonRow  <-- because of 111
abc,ewq,NoCommonRow
abc,tyu,CommonRow  <-- because of 777
def,ewq,NoCommonRow
def,tyu,NoCommonRow
ewq,tyu,NoCommonRow

我当前用于执行此操作的 PySpark 代码是

# "value_list" contains the unique list of values in Column 1
index = 0
for col1 in value_list:
    index += 1
    df_col1 = df.filter(df.Column1 == col1)
    for col2 in value_list[index:]:
        df_col2 = df.filter(df.Column1 == col2)

        df_join = df_col1.join(df_col2, on=(df_col1.Column2 == df_col2.Column2), how="inner")
        if df_join.limit(1).count() == 0:   # No common row
            print(col1,col2,"NoCommonRow")
        else:
            print(col1,col2,"CommonRow")

但是,我发现这需要很长时间才能运行(

df
有数百万行)。有没有办法优化它以使其运行得更快,或者有更好的方法来进行比较?

python dataframe pyspark optimization
1个回答
0
投票

自连接数据框以创建行对

df1 = df.alias('l').join(df.alias('r'), on=F.expr("l.column1 != r.column1"))

# df1.show()
# +-------+-------+-------+-------+
# |Column1|Column2|Column1|Column2|
# +-------+-------+-------+-------+
# |    abc|    111|    def|    666|
# |    abc|    111|    def|    111|
# |    abc|    111|    tyu|    777|
# |    abc|    111|    def|    222|
# |    abc|    111|    tyu|    333|
# |    abc|    111|    ewq|    888|
# |    def|    666|    abc|    111|
# |    def|    666|    tyu|    777|
# |    def|    666|    abc|    777|
# |    def|    666|    tyu|    333|
# |    def|    666|    ewq|    888|
# |    def|    111|    abc|    111|
# |    def|    111|    tyu|    777|
# |    def|    111|    abc|    777|
# |    def|    111|    tyu|    333|
# |    def|    111|    ewq|    888|
# |    tyu|    777|    abc|    111|
# |    tyu|    777|    def|    666|
# |    tyu|    777|    def|    111|
# |    tyu|    777|    abc|    777|
# +-------+-------+-------+-------+

添加一个布尔值

flag
来检查左右数据帧中的column2值是否匹配。同样,添加一个
pair
列,它是左右数据帧中的第 1 列值的组合

df1 = df1.withColumn('flag', F.expr('l.column2 = r.column2'))
df1 = df1.withColumn('pair', F.array_sort(F.array('l.column1', 'r.column1')))

# df1.show()
# +-------+-------+-------+-------+-----+----------+
# |Column1|Column2|Column1|Column2| flag|      pair|
# +-------+-------+-------+-------+-----+----------+
# |    abc|    111|    def|    666|false|[abc, def]|
# |    abc|    111|    def|    111| true|[abc, def]|
# |    abc|    111|    tyu|    777|false|[abc, tyu]|
# |    abc|    111|    def|    222|false|[abc, def]|
# |    abc|    111|    tyu|    333|false|[abc, tyu]|
# |    abc|    111|    ewq|    888|false|[abc, ewq]|
# |    def|    666|    abc|    111|false|[abc, def]|
# |    def|    666|    tyu|    777|false|[def, tyu]|
# |    def|    666|    abc|    777|false|[abc, def]|
# |    def|    666|    tyu|    333|false|[def, tyu]|
# |    def|    666|    ewq|    888|false|[def, ewq]|
# |    def|    111|    abc|    111| true|[abc, def]|
# |    def|    111|    tyu|    777|false|[def, tyu]|
# |    def|    111|    abc|    777|false|[abc, def]|
# |    def|    111|    tyu|    333|false|[def, tyu]|
# |    def|    111|    ewq|    888|false|[def, ewq]|
# |    tyu|    777|    abc|    111|false|[abc, tyu]|
# |    tyu|    777|    def|    666|false|[def, tyu]|
# |    tyu|    777|    def|    111|false|[def, tyu]|
# |    tyu|    777|    abc|    777| true|[abc, tyu]|
# +-------+-------+-------+-------+-----+----------+

pair
对数据帧进行分组,并与
max
进行聚合,以检查给定
pair
组中是否有任何行将标志设置为
True

df1 = df1.groupBy('pair').agg(F.max('flag').alias('has_common_row'))

# df1.show()
# +----------+--------------+
# |      pair|has_common_row|
# +----------+--------------+
# |[abc, def]|          true|
# |[abc, tyu]|          true|
# |[abc, ewq]|         false|
# |[def, tyu]|         false|
# |[def, ewq]|         false|
# |[ewq, tyu]|         false|
# +----------+--------------+
© www.soinside.com 2019 - 2024. All rights reserved.