我在计算产品推荐器的一些余弦相似度时遇到一些问题。我有一个文章数据库,其中包含 4 万篇文章,每篇文章都带有描述。我正在尝试计算此元素的余弦相似度矩阵,以便当给出任何文章时,可以检索根据描述最相似的前 N 篇文章。
我正在使用 Python 开发此代码,代码应该在 Microsoft Azure Function 中运行。正如您可以想象的那样,给定初始 df 的大小,这无法在该平台上运行(它会耗尽内存,因为输出的余弦相似度的大小就超过 8 GB)。
考虑到这一点,我决定采取另一种方法,即使用 Spark,更准确地说,是使用 Microsoft Fabric 的 Spark 作业定义。我一直在审查类似的问题(因为我的 Pyspark 知识极其有限),例如:Calculate cosine approximation in Pyspark Dataframe,但我正在努力使代码在我的情况下工作。
尝试一一解决问题时,我意识到我的代码甚至没有进行余弦相似度计算(或至少不是全部),因为我的 Spark 作业在尝试矩阵交叉连接时被冻结,并且我无法检查stderr 日志中的确切问题是什么。
我知道我的 40k 行的起始 df 相当大,这意味着 40k x 40k cos sim 矩阵,但我在 Google Collab 笔记本上使用纯 Python 尝试了此操作,它能够在大约 10 分钟内计算出来(它是确实,我必须使用 TPU 后端,因为普通 CPU 在消耗标准内存(12GB-)后也会崩溃。 此时我在想:
-MS Fabric SparkJobs 也不是满足我的需求的有效工具,因为它们提供了 Spark 的“弱/上限”版本。
-我的 Pyspark 代码显然有一些不正确的地方,导致 Spark 执行器错误地停用。
我需要一些帮助来确定我处于哪种情况(以及如果是第二种情况如何改进代码)。
让我分享一些代码。 这是用于相同任务的 Python 代码,实际上可以在 TPU Collab 笔记本上运行(使用 sklearn 导入):
articulos = articulos[['article_id','product_code', 'detail_desc']]
articulos_unicos = articulos.drop_duplicates(subset=['product_code'])
articulos_final = articulos_unicos.dropna(subset=['detail_desc'])
articulos_final = articulos_final.reset_index(drop=True)
count = CountVectorizer(stop_words='english')
count_matrix = count.fit_transform(articulos_final['detail_desc'])
count_matrix = count_matrix.astype(np.float32)
similitud_coseno = cosine_similarity(count_matrix, count_matrix)
np.fill_diagonal(similitud_coseno, -1)
top_5_simart = []
for i in range(similitud_coseno.shape[0]):
top_indices = np.argpartition(similitud_coseno[i], -5)[-5:]
sorted_top_indices = top_indices[np.argsort(-similitud_coseno[i, top_indices])]
top_5_simart.append(sorted_top_indices.tolist())
with open('top_5_simart.json', 'w') as f:
json.dump(top_5_simart, f)
另一方面,这是我正在尝试实现的 Pyspark 代码:
# Selecting the necessary columns
articulos = articulos.select("article_id", "product_code", "detail_desc")
# Removing duplicates
articulos = articulos.dropDuplicates(["product_code"]).dropna(subset=["detail_desc"])
# Tokenizing the description text
tokenizer = Tokenizer(inputCol="detail_desc", outputCol="words")
articulos_tokenized = tokenizer.transform(articulos)
# Removing stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
articulos_clean = remover.transform(articulos_tokenized)
# Generating the feature column with CountVectorizer
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features")
vectorizer_model = vectorizer.fit(articulos_clean)
articulos_final = vectorizer_model.transform(articulos_clean)
articulos_final = articulos_final.select("article_id","features")
print(articulos_final.dtypes)
print(articulos_final.schema)
# Using crossJoin to obatain all the pairwise values for the cosine sim
articulos_final2 = articulos_final.withColumnRenamed("article_id","article_id2").withColumnRenamed("features","features2")
articulos_final_cos_sim = articulos_final.crossJoin(articulos_final2)
articulos_final.unpersist()
articulos_final2.unpersist()
articulos_final_cos_sim.write.mode('overwrite').format('delta').save(cspath)
'''
# Realizar el crossJoin para obtener todas las combinaciones de pares de filas
articulos_final_cos_sim = (
articulos_final.alias("a")
.crossJoin(articulos_final.alias("b"))
.withColumn(
"dot_product",
F.col("a.features").dot(F.col("b.features")) # Producto punto de los vectores
)
.withColumn(
"norm_a",
F.expr("a.features.norm(2)") # Norma L2 del vector 'a'
)
.withColumn(
"norm_b",
F.expr("b.features.norm(2)") # Norma L2 del vector 'b'
)
.withColumn(
"cosine_similarity",
F.col("dot_product") / (F.col("norm_a") * F.col("norm_b")) # Similitud coseno
)
)
# Eliminar las columnas innecesarias
articulos_final_cos_sim = articulos_final_cos_sim.drop("dot_product", "norm_a", "norm_b")
# Agrupar para construir la matriz de similitud
articulos_final_cos_sim = articulos_final_cos_sim.groupBy("a.article_id").pivot("b.article_id").sum("cosine_similarity")
# Guardar los resultados
articulos_final_cos_sim.write.mode('overwrite').format('delta').save(testpath2)
# Filtrar para obtener los 5 artículos más similares para cada uno
windowSpec = Window.partitionBy("article_id").orderBy(F.col("cosine_similarity").desc())
top_5_simart = articulos_final_cos_sim.withColumn("rank", F.row_number().over(windowSpec)).filter(F.col("rank") <= 5)
# Guardar los resultados en formato JSON
top_5_simart.write.mode('overwrite').json(Top5SimartPath)
'''
正如您所看到的,代码的最后一部分被注释了,因为(经过多次尝试)该代码没有给我任何导致执行失败的错误,它只是花了很长时间才执行。代码的活动部分仅执行交叉连接,但不执行任何计算,是的,正如我提到的,它永远卡住了。
如果需要我还可以提供Spark的stderror日志。
交叉连接为您提供
1,60,00,00,000
行
然后你对其进行分组,这会进行多次洗牌并需要更多的时间和内存。
所以,我的想法是在加入之前进行分区并使用更多数量的节点/工作线程和核心。
这是我在 Spark 环境中使用的示例代码,包含 40k 随机数据行。
dot_udf = F.udf(lambda v1, v2: float(v1.dot(v2)), DoubleType())
# Define UDF to compute the L2 norm of a vector
norm_udf = F.udf(lambda v: float(v.norm(2)), DoubleType())
a = articulos_final.repartition(40000,"article_id")
b = articulos_final.repartition(40000,"article_id")
articulos_final_cos_sim = (
a.alias("a")
.crossJoin(b.alias("b"))
.withColumn("dot_product", dot_udf(F.col("a.features"), F.col("b.features"))) # Dot product
.withColumn("norm_a", norm_udf(F.col("a.features"))) # L2 norm of vector 'a'
.withColumn("norm_b", norm_udf(F.col("b.features"))) # L2 norm of vector 'b'
.withColumn(
"cosine_similarity",
F.col("dot_product") / (F.col("norm_a") * F.col("norm_b")) # Cosine similarity
)
)
final1 = articulos_final_cos_sim.groupBy("a.article_id").pivot("b.article_id").sum("cosine_similarity")
final1.select("article_id","39848").sort("39848",ascending=False).show(5)
这里,对于文章
39848
计算了前5个相似文档,运行时间约为29分钟,有10个工人,每个16个核心,总共160个核心
输出:
+----------+------------------+
|article_id| 39848|
+----------+------------------+
| 39848| 1.0|
| 20088|0.3333333333333333|
| 6257| 0.3125|
| 21005|0.2886751345948129|
| 29086|0.2886751345948129|
+----------+------------------+
only showing top 5 rows
和
在您的结构环境中增加计算大小并运行上面的代码。
还有,如果你经常操作,一定要坚持。