计算 350 万个 ID 组合中同时出现的实体的最大数量的函数?

问题描述 投票:0回答:1

我得到了这个:

x | y

1 | a,b,c,d,e

2 | a,b,c,d

3 | a,c,d
...

我想要这个:

1,2 | 4 (a,b,c,d)

1,3 | 3 (a,c,d)

2,3 | 3 (a,c,d)

我有 3*10^6 这样的行(300 万条记录)

y 可以是 -> 5 到 200 之间

有没有办法确定列表顶部实体记录数量最多的结果?

  • 交叉连接不是一个选项

我尝试在具有 3 个虚拟机(每个 20 个核心)的 Spark 集群上运行它。

  1. 将大小截断为 20k 记录(10^8 次操作)
  2. 运行时间约为 3 分钟。
  3. 将 20k 记录的副本提供给每台机器并将数据划分为 100 个块。将每个块传递给 vm 并与 20k 记录进行连接以获取前 100 个本地记录。
  4. 相应更新全球前 100 名。

20k 条记录:7 mb,360 万条记录:850 mb

apache-spark pyspark parallel-processing distributed-computing
1个回答
0
投票

要在不使用 CROSS JOIN(由于其计算复杂性)的情况下有效解决此问题,您可以利用 PySpark 中的替代方法。以下是如何进行的分步概述:

问题分解

  • 您有一组行,其中每条记录都包含一组实体 (
    y
    ),并且您需要找到实体重叠最大的行对。
  • 目标是确定
    y
    值具有最大交集的行对,并按交集的大小对它们进行排名。

方法

考虑到数据的大小(300 万条记录,每条记录最多 200 个实体),像 CROSS JOIN 这样的强力方法是不可行的。相反,我们可以使用以下方法,将问题划分为可管理的块:

  1. 分块和广播策略:

    • 将数据划分为更小、更易于管理的块。
    • 对每个块执行本地操作以找到顶部
      n
      交叉点。
    • 使用分布式策略,将较小的块广播到每台机器,从而最大限度地减少数据移动的大小。
    • 将本地 top
      n
      结果聚合到全局 top 列表中。
  2. 使用高效的集合交集

    • 对于每一行,将
      y
      值视为一组。
    • 对于每对行,计算交集大小。
    • 仅保留有交叉点的对,跳过没有重叠的对。
  3. 通过对减少优化

    • 通过使用基于散列的结构来跟踪行对之间的重叠,避免重新计算交集。
    • 使用 Spark 的 RDD 或 DataFrame 操作而不是完全连接来降低复杂性。

代码实现

这是 PySpark 中的一个实现,它可以解决问题并避免 CROSS JOIN:

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, explode, size, col
from itertools import combinations

# Start Spark session
spark = SparkSession.builder.appName("EntityOverlap").getOrCreate()

# Sample data (replace with your real data)
data = [
    (1, "a,b,c,d,e"),
    (2, "a,b,c,d"),
    (3, "a,c,d"),
    # Add all your 3 million rows here
]

# Convert data into a DataFrame
df = spark.createDataFrame(data, ["x", "y"])

# Step 1: Convert the 'y' column into a set of values
df = df.withColumn("y_set", explode(col("y").split(",")))

# Step 2: Group by the 'x' column and collect sets of entities
df = df.groupBy("x").agg(collect_set("y_set").alias("entities"))

# Step 3: Find pairs of rows with common entities
def get_combinations(rows):
    results = []
    for (x1, entities1), (x2, entities2) in combinations(rows, 2):
        common_entities = set(entities1).intersection(set(entities2))
        if common_entities:
            results.append((x1, x2, len(common_entities), ",".join(common_entities)))
    return results

# Collect the data into a list and apply the combination function
pairs = df.collect()
pair_results = get_combinations(pairs)

# Convert back to a DataFrame
columns = ["x1", "x2", "common_count", "common_entities"]
pair_df = spark.createDataFrame(pair_results, columns)

# Step 4: Order the results by the intersection size
pair_df = pair_df.orderBy(col("common_count").desc())

# Step 5: Show the top results
pair_df.show(truncate=False)

说明:

  1. 数据准备:

    • 我们首先拆分
      y
      列值(以逗号分隔)并将它们分解为单独的值。
    • 然后,将这些值重新分组到每个
      x
      标识符的集合中(这可以防止同一组中出现重复的实体)。
  2. 配对行:

    • 使用Python的
      itertools.combinations
      ,我们生成所有行对并计算它们的实体集的交集。
    • 对于每一对,我们计算交集的大小并仅保留有重叠的对。
  3. 高效分组:

    • 由于您正在处理大型数据集,因此对的计算是在每个块上完成的,从而允许并行处理。
    • 您可以通过直接计算相关对的交集来避免完全交叉连接。
  4. 排序

    • 最后,结果按共同实体的数量降序排序,因此重叠度最高的对出现在顶部。

性能考虑因素:

  • 分块:由于您正在处理 300 万条记录,因此请将数据集划分为块(例如,基于分区键)并将较小的数据集(例如,20k 条记录)广播到每个工作节点。
  • 广播连接:通过向每个工作人员广播较小的数据集,可以避免昂贵的交叉连接。
  • 设置操作:使用本机设置操作(如
    intersection
    )有助于最大限度地减少比较的开销。

大规模优化示例:

您提到将数据分块为可管理的大小(例如 20k 记录)。这可以按如下方式集成:

df_small = df.limit(20000)  # Create a smaller broadcastable dataset
df_large = df.exceptAll(df_small)  # Exclude the small chunk from the main dataset

# Broadcast small dataset
df_small_broadcast = spark.sparkContext.broadcast(df_small.collect())

# Map over large dataset and compute intersections with the broadcasted small set
results_rdd = df_large.rdd.flatMap(lambda row: get_combinations_with_broadcast(row, df_small_broadcast.value))

# Collect final top results
final_top_results = results_rdd.takeOrdered(100, key=lambda x: -x[2])  # Top 100 intersections

这种方法将有助于跨大型集群有效地扩展解决方案。

如果您需要进一步说明,请告诉我!

© www.soinside.com 2019 - 2024. All rights reserved.