为Spark RDD中的每个分区基于复合键获取最高值

问题描述 投票:0回答:1

我要使用以下rdd

rdd = sc.parallelize([("K1", "e", 9), ("K1", "aaa", 9), ("K1", "ccc", 3), ("K1", "ddd", 9),
("B1", "qwe", 4), ("B1", "rty", 7), ("B1", "iop", 8), ("B1", "zxc", 1)])

获取输出

[('K1', 'aaa', 9),
 ('K1', 'ddd', 9),
 ('K1', 'e', 9),
 ('B1', 'iop', 8),
 ('B1', 'rty', 7),
 ('B1', 'qwe', 4)]

我参考了Get Top 3 values for every key in a RDD in Spark,并使用了以下代码

from heapq import nlargest
rdd.groupBy(
    lambda x: x[0]
).flatMap(
    lambda g: nlargest(3, g[1], key=lambda x: (x[2],x[1]))
).collect()

但是,我只能得出

[('K1', 'e', 9),
 ('K1', 'ddd', 9),
 ('K1', 'aaa', 9),
 ('B1', 'iop', 8),
 ('B1', 'rty', 7),
 ('B1', 'qwe', 4)]

我该怎么办?

apache-spark pyspark rdd
1个回答
1
投票

实际上是一个排序问题,但是由于sortingshuffling在计算上非常昂贵。但是您可以尝试:

rdd2 = rdd.groupBy(
    lambda x: x[0]
).flatMap(
    lambda g: nlargest(3, g[1], key=lambda x: (x[2],x[1]))
)

rdd2.sortBy(lambda x: x[1], x[2]).collect()
# [('K1', 'aaa', 9), ('K1', 'ddd', 9), ('K1', 'e', 9), ('B1', 'iop', 8), ('B1', 'qwe', 4), ('B1', 'rty', 7)]

我已经使用元组的第一个和第二个值对其进行了排序。

[另外请注意,q在字母r之前。因此,您提到的预期输出不正确并且具有误导性。

© www.soinside.com 2019 - 2024. All rights reserved.