按值过滤 RDD PySpark

问题描述 投票:0回答:1

我正在使用 PySpark,我正在寻找一种方法来检查:

对于给定的 check_number = 01

如果我的

rdd1
中第三个元素的值不包含check_number ==> 从
rdd2
..

获取有关此 check_number 的所有信息

鉴于:

rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=',
                        u'serviceXXX',
                        u'testAB_02',
                        u'2016-07-03')])

假设第一个元素是

ID
,第二个是服务名称,第三个是测试名称,带有
ID
,第四个元素是日期。

rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a',
                        u'01',
                        u'2016-11-02'),

                       (u'XXXX52547412558933nnBlmquhdyhM',
                        u'02',
                        u'2016-11-04')])

假设第一个元素是 ID,第二个元素是测试 ID,最后一个元素是日期。

所以,我的

rdd1
testAB_02
与我的 check_number 不匹配(因此服务名称必须以 check_number 的值结尾)。我的目标是从
rdd2
获取所有行,并以
01
作为测试 ID。这里的预期输出必须是:

[(u'9b023b8233c242c09b93506942002e0a',
  u'01',
  u'2016-11-02')

这是我的代码:

def update_typesdecohorte_table(rdd1, rdd2):

    if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True:

        new_rdd2 = rdd2.filter(lambda x : x[1] == check_number)

    else:

         pass

    return new_rdd2

new_rdd2 = update_typesdecohorte_table(rdd1, rdd2)

这给出了:

[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')]

这段代码可以工作,但我不喜欢这个方法。最有效的方法是什么?

apache-spark mapreduce pyspark apache-spark-sql rdd
1个回答
1
投票

如果你想从rdd2中获取所有在rdd1中没有匹配元素的记录,你可以使用

cartesian
:

new_rdd2 = rdd1.cartesian(rdd2)
    .filter(lambda r: not r[0][2].endswith(r[1][1]))
    .map(lambda r: r[1])

如果您的 check_number 是固定的,则在最后按此值过滤:

new_rdd2.filter(lambda r: r[1] == check_number).collect()

但是,如果您的 check_number 是固定的并且两个 RDD 都很大,那么它甚至会比您的解决方案慢,因为它需要在连接期间对分区进行洗牌(您的代码仅执行非洗牌转换)。

© www.soinside.com 2019 - 2024. All rights reserved.