我得到了这个:
x | y
1 | a,b,c,d,e
2 | a,b,c,d
3 | a,c,d
...
我想要这个:
1,2 | 4 (a,b,c,d)
1,3 | 3 (a,c,d)
2,3 | 3 (a,c,d)
我有 3*10^6 这样的行(300 万条记录)
y 可以是 -> 5 到 200 之间
有没有办法确定列表顶部实体记录数量最多的结果?
我尝试在具有 3 个虚拟机(每个 20 个核心)的 Spark 集群上运行它。
20k 条记录:7 mb,360 万条记录:850 mb
要在不使用 CROSS JOIN(由于其计算复杂性)的情况下有效解决此问题,您可以利用 PySpark 中的替代方法。以下是如何进行的分步概述:
y
),并且您需要找到实体重叠最大的行对。y
值具有最大交集的行对,并按交集的大小对它们进行排名。考虑到数据的大小(300 万条记录,每条记录最多 200 个实体),像 CROSS JOIN 这样的强力方法是不可行的。相反,我们可以使用以下方法,将问题划分为可管理的块:
分块和广播策略:
n
交叉点。n
结果聚合到全局 top 列表中。使用高效的集合交集:
y
值视为一组。通过对减少优化:
这是 PySpark 中的一个实现,它可以解决问题并避免 CROSS JOIN:
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, explode, size, col
from itertools import combinations
# Start Spark session
spark = SparkSession.builder.appName("EntityOverlap").getOrCreate()
# Sample data (replace with your real data)
data = [
(1, "a,b,c,d,e"),
(2, "a,b,c,d"),
(3, "a,c,d"),
# Add all your 3 million rows here
]
# Convert data into a DataFrame
df = spark.createDataFrame(data, ["x", "y"])
# Step 1: Convert the 'y' column into a set of values
df = df.withColumn("y_set", explode(col("y").split(",")))
# Step 2: Group by the 'x' column and collect sets of entities
df = df.groupBy("x").agg(collect_set("y_set").alias("entities"))
# Step 3: Find pairs of rows with common entities
def get_combinations(rows):
results = []
for (x1, entities1), (x2, entities2) in combinations(rows, 2):
common_entities = set(entities1).intersection(set(entities2))
if common_entities:
results.append((x1, x2, len(common_entities), ",".join(common_entities)))
return results
# Collect the data into a list and apply the combination function
pairs = df.collect()
pair_results = get_combinations(pairs)
# Convert back to a DataFrame
columns = ["x1", "x2", "common_count", "common_entities"]
pair_df = spark.createDataFrame(pair_results, columns)
# Step 4: Order the results by the intersection size
pair_df = pair_df.orderBy(col("common_count").desc())
# Step 5: Show the top results
pair_df.show(truncate=False)
数据准备:
y
列值(以逗号分隔)并将它们分解为单独的值。x
标识符的集合中(这可以防止同一组中出现重复的实体)。配对行:
itertools.combinations
,我们生成所有行对并计算它们的实体集的交集。高效分组:
排序:
intersection
)有助于最大限度地减少比较的开销。您提到将数据分块为可管理的大小(例如 20k 记录)。这可以按如下方式集成:
df_small = df.limit(20000) # Create a smaller broadcastable dataset
df_large = df.exceptAll(df_small) # Exclude the small chunk from the main dataset
# Broadcast small dataset
df_small_broadcast = spark.sparkContext.broadcast(df_small.collect())
# Map over large dataset and compute intersections with the broadcasted small set
results_rdd = df_large.rdd.flatMap(lambda row: get_combinations_with_broadcast(row, df_small_broadcast.value))
# Collect final top results
final_top_results = results_rdd.takeOrdered(100, key=lambda x: -x[2]) # Top 100 intersections
这种方法将有助于跨大型集群有效地扩展解决方案。
如果您需要进一步说明,请告诉我!