如何在pySpark中distnct后进行压缩

问题描述 投票:0回答:2

以下程序在 zip 步骤中失败。

x = sc.parallelize([1, 2, 3, 1, 2, 3])
y = sc.parallelize([1, 2, 3])
z = x.distinct()
print x.zip(y).collect()

产生的错误取决于是否指定了多个分区。

我明白了

两个 RDD [必须]具有相同数量的分区,并且每个分区中的元素数量相同。

解决此限制的最佳方法是什么?

我一直在使用以下代码执行操作,但我希望找到更有效的东西。

def safe_zip(left, right):
    ix_left = left.zipWithIndex().map(lambda row: (row[1], row[0]))
    ix_right = right.zipWithIndex().map(lambda row: (row[1], row[0]))
    return ix_left.join(ix_right).sortByKey().values()
pyspark
2个回答
0
投票

我认为这可以通过在 RDD 上使用 cartesian() 来完成

import pyspark
x = sc.parallelize([1, 2, 3, 1, 2, 3])
y = sc.parallelize([1, 2, 3])
x.distinct().cartesian(y.distinct()).collect()

0
投票

可以使用合并重新分区:

x_repartitioned = x.coalesce(y.getNumPartitions())  # Match partitions of y
z = x_repartitioned.zip(y).collect()
© www.soinside.com 2019 - 2024. All rights reserved.