我有两个rdd
rdd1 =[('1', 3428), ('2', 2991), ('3', 2990), ('4', 2883), ('5', 2672), ('5', 2653)]
rdd2 = [['1', 'Toy Story (1995)'], ['2', 'Jumanji (1995)'], ['3', 'Grumpier Old Men (1995)']]
我想执行使第一rdd的第一元素与第二rdd的第二元素相关的操作
我的最终结果将是这样
[(''Toy Story (1995)'', 3428), ('Jumanji (1995)', 2991), ('Grumpier Old Men (1995)', 2990)]
请向我介绍执行此操作的方法
使用联接和映射:
rdd1.join(rdd2).map(lambda x: (x[1][1], x[1][0])).collect()
#[('Toy Story (1995)', 3428),
# ('Jumanji (1995)', 2991),
# ('Grumpier Old Men (1995)', 2990)]
您可以为此使用列表理解:
>>> [(y[1], x[1]) for x in rdd1 for y in rdd2 if x[0] == y[0]]
[('Toy Story (1995)', 3428),
('Jumanji (1995)', 2991),
('Grumpier Old Men (1995)', 2990)]
如果在集群上处理大型数据以提高性能,也可以使用广播和数据帧操作来完成它>
df_points = spark.createDataFrame(rdd1, schema=['index', 'points']) df_movie = spark.createDataFrame(rdd2, schema=['index', 'Movie']) df_join = df_points.join(broadcast(df_movie), on='index').select("Movie","points")
或者您也可以根据需要转换回RDD
df_join.rdd.map(list).collect()