我有 pyspark df,我根据 2 列自行加入
cluster_id
具有不同计数的不同簇,而 unique_id
每行都是唯一的。
df_filtered.repartition('cluster_id')
df_similar_cluster = df_filtered.alias('df1').join(
df_filtered.alias('df2'),
(F.col('df1.cluster_id') == F.col('df2.cluster_id')) &
(F.col('df1.unique_id') < F.col('df2.unique_id')),
'inner'
)
我尝试在
cluster_id
上重新分区,但仍然需要很多时间。
大约有 5k 个集群,每个集群都有不同数量的记录。
数据框中总共有 4M 条记录。
您的问题应该出现在第二个连接子句中,即
<
与 unique_id
的比较。这意味着在每个簇中(应该是 ~800
行),您将第一个 unique_id
与所有其他 799 个连接,第二个与 798 个连接,依此类推,直到 1。如果您查看 1 到 n 的总和在这里 ,您会看到这应该接近 n*(n+1)/2
,在您的情况下,每组应该约为 320,000 行。
将此值乘以集群数量,您会发现结果表中应该有大约 1.6B 行。
这当然是一个粗略的估计,但是您可以看到此连接的效果以及为什么生成的数据集比原始数据集大得多。如果您有兴趣在结果数据集中显示
unique_id
的每个 cluster_id
组合,那么这可能是无法避免的,但也许您可以在整个过程中查看这是否真的是您所需要的。因此,看看您之后做了什么,可能进行过滤,并看看在此加入之前是否可以改进事情。