Pyspark MS Fabric SparkJob 的余弦相似度矩阵

问题描述 投票:0回答:1

我在计算产品推荐器的一些余弦相似度时遇到一些问题。我有一个文章数据库,其中包含 4 万篇文章,每篇文章都带有描述。我正在尝试计算此元素的余弦相似度矩阵,以便当给出任何文章时,可以检索根据描述最相似的前 N 篇文章。

我正在使用 Python 开发此代码,代码应该在 Microsoft Azure Function 中运行。正如您可以想象的那样,给定初始 df 的大小,这无法在该平台上运行(它会耗尽内存,因为输出的余弦相似度的大小就超过 8 GB)。

考虑到这一点,我决定采取另一种方法,即使用 Spark,更准确地说,是使用 Microsoft Fabric 的 Spark 作业定义。我一直在审查类似的问题(因为我的 Pyspark 知识极其有限),例如:Calculate cosine approximation in Pyspark Dataframe,但我正在努力使代码在我的情况下工作。

尝试一一解决问题时,我意识到我的代码甚至没有进行余弦相似度计算(或至少不是全部),因为我的 Spark 作业在尝试矩阵交叉连接时被冻结,并且我无法检查stderr 日志中的确切问题是什么。

我知道我的 40k 行的起始 df 相当大,这意味着 40k x 40k cos sim 矩阵,但我在 Google Collab 笔记本上使用纯 Python 尝试了此操作,它能够在大约 10 分钟内计算出来(它是确实,我必须使用 TPU 后端,因为普通 CPU 在消耗标准内存(12GB-)后也会崩溃。 此时我在想:

-MS Fabric SparkJobs 也不是满足我的需求的有效工具,因为它们提供了 Spark 的“弱/上限”版本。

-我的 Pyspark 代码显然有一些不正确的地方,导致 Spark 执行器错误地停用。

我需要一些帮助来确定我处于哪种情况(以及如果是第二种情况如何改进代码)。

让我分享一些代码。 这是用于相同任务的 Python 代码,实际上可以在 TPU Collab 笔记本上运行(使用 sklearn 导入):

    articulos = articulos[['article_id','product_code', 'detail_desc']]
    articulos_unicos = articulos.drop_duplicates(subset=['product_code'])
    articulos_final = articulos_unicos.dropna(subset=['detail_desc'])
    articulos_final = articulos_final.reset_index(drop=True)

    count = CountVectorizer(stop_words='english')
    count_matrix = count.fit_transform(articulos_final['detail_desc'])
    count_matrix = count_matrix.astype(np.float32)

    similitud_coseno = cosine_similarity(count_matrix, count_matrix)
    

    np.fill_diagonal(similitud_coseno, -1)

    top_5_simart = []

    
    for i in range(similitud_coseno.shape[0]):
        top_indices = np.argpartition(similitud_coseno[i], -5)[-5:] 
        sorted_top_indices = top_indices[np.argsort(-similitud_coseno[i, top_indices])] 
        top_5_simart.append(sorted_top_indices.tolist())

    with open('top_5_simart.json', 'w') as f:
        json.dump(top_5_simart, f)

另一方面,这是我正在尝试实现的 Pyspark 代码:

    # Selecting the necessary columns
    articulos = articulos.select("article_id", "product_code", "detail_desc")

    # Removing duplicates
    articulos = articulos.dropDuplicates(["product_code"]).dropna(subset=["detail_desc"])

    # Tokenizing the description text
    tokenizer = Tokenizer(inputCol="detail_desc", outputCol="words")
    articulos_tokenized = tokenizer.transform(articulos)

    # Removing stopwords
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    articulos_clean = remover.transform(articulos_tokenized)

    # Generating the feature column with CountVectorizer
    vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features")
    vectorizer_model = vectorizer.fit(articulos_clean)
    articulos_final = vectorizer_model.transform(articulos_clean)
    articulos_final = articulos_final.select("article_id","features")
    print(articulos_final.dtypes)
    print(articulos_final.schema)
    
    # Using crossJoin to obatain all the pairwise values for the cosine sim
    articulos_final2 = articulos_final.withColumnRenamed("article_id","article_id2").withColumnRenamed("features","features2")
    articulos_final_cos_sim = articulos_final.crossJoin(articulos_final2)
    articulos_final.unpersist()
    articulos_final2.unpersist()
    articulos_final_cos_sim.write.mode('overwrite').format('delta').save(cspath)
    
    '''
    # Realizar el crossJoin para obtener todas las combinaciones de pares de filas
    articulos_final_cos_sim = (
    articulos_final.alias("a")
    .crossJoin(articulos_final.alias("b"))
    .withColumn(
        "dot_product", 
        F.col("a.features").dot(F.col("b.features"))  # Producto punto de los vectores
        )
    .withColumn(
        "norm_a", 
        F.expr("a.features.norm(2)")  # Norma L2 del vector 'a'
        )
    .withColumn(
        "norm_b", 
        F.expr("b.features.norm(2)")  # Norma L2 del vector 'b'
        )
    .withColumn(
        "cosine_similarity",
        F.col("dot_product") / (F.col("norm_a") * F.col("norm_b"))  # Similitud coseno
        )
    )

    # Eliminar las columnas innecesarias
    articulos_final_cos_sim = articulos_final_cos_sim.drop("dot_product", "norm_a", "norm_b")

    # Agrupar para construir la matriz de similitud
    articulos_final_cos_sim = articulos_final_cos_sim.groupBy("a.article_id").pivot("b.article_id").sum("cosine_similarity")

    # Guardar los resultados
    articulos_final_cos_sim.write.mode('overwrite').format('delta').save(testpath2)


    
    # Filtrar para obtener los 5 artículos más similares para cada uno
    windowSpec = Window.partitionBy("article_id").orderBy(F.col("cosine_similarity").desc())

    top_5_simart = articulos_final_cos_sim.withColumn("rank", F.row_number().over(windowSpec)).filter(F.col("rank") <= 5)

    # Guardar los resultados en formato JSON
    top_5_simart.write.mode('overwrite').json(Top5SimartPath)
    '''

正如您所看到的,代码的最后一部分被注释了,因为(经过多次尝试)该代码没有给我任何导致执行失败的错误,它只是花了很长时间才执行。代码的活动部分仅执行交叉连接,但不执行任何计算,是的,正如我提到的,它永远卡住了。

如果需要我还可以提供Spark的stderror日志。

python pyspark azure-functions cosine-similarity microsoft-fabric
1个回答
0
投票

交叉连接为您提供

1,60,00,00,000
行 然后你对其进行分组,这会进行多次洗牌并需要更多的时间和内存。

所以,我的想法是在加入之前进行分区并使用更多数量的节点/工作线程和核心。

这是我在 Spark 环境中使用的示例代码,包含 40k 随机数据行。

dot_udf = F.udf(lambda  v1, v2: float(v1.dot(v2)), DoubleType())
# Define UDF to compute the L2 norm of a vector
norm_udf = F.udf(lambda  v: float(v.norm(2)), DoubleType())

a = articulos_final.repartition(40000,"article_id")
b = articulos_final.repartition(40000,"article_id")

articulos_final_cos_sim = (
    a.alias("a")
    .crossJoin(b.alias("b"))
    .withColumn("dot_product", dot_udf(F.col("a.features"), F.col("b.features")))  # Dot product
    .withColumn("norm_a", norm_udf(F.col("a.features")))  # L2 norm of vector 'a'
    .withColumn("norm_b", norm_udf(F.col("b.features")))  # L2 norm of vector 'b'
    .withColumn(
        "cosine_similarity",
        F.col("dot_product") / (F.col("norm_a") * F.col("norm_b"))  # Cosine similarity
    )
)

final1 = articulos_final_cos_sim.groupBy("a.article_id").pivot("b.article_id").sum("cosine_similarity")

final1.select("article_id","39848").sort("39848",ascending=False).show(5)

这里,对于文章

39848
计算了前5个相似文档,运行时间约为29分钟,有10个工人,每个16个核心,总共160个核心

输出:

+----------+------------------+
|article_id|             39848|
+----------+------------------+
|     39848|               1.0|
|     20088|0.3333333333333333|
|      6257|            0.3125|
|     21005|0.2886751345948129|
|     29086|0.2886751345948129|
+----------+------------------+
only showing top 5 rows

enter image description here

在您的结构环境中增加计算大小并运行上面的代码。

还有,如果你经常操作,一定要坚持。

© www.soinside.com 2019 - 2024. All rights reserved.