PySpark FuzzyWuzzy UDF 在小数据集上导致超时错误/在 PySpark 中过滤具有 Fuzzy Wuzzy 相似度分数的列时出现超时错误

问题描述 投票:0回答:1

我正在开发一个 PySpark 脚本,以使用 FuzzyWuzzy 计算列之间的相似度分数。我为此定义了一个 UDF,并使用 for 循环来迭代元数据表中指定的列,将相似度分数存储在同一 DataFrame 中。 这是我定义的 UDF:

similarity_udf = F.udf(lambda x, y: fuzz.ratio(x, y), IntegerType())

我有一个元数据表,其中列出了我需要计算相似度分数的列对。这是简化的代码:

meta_data = [
    ('column1', 'column2'),
    ('column3', 'column4'),
    # More column pairs
]
 
for col1, col2 in meta_data:
    df = df.withColumn(f'{col1}_{col2}_similarity', similarity_udf(F.col(col1), F.col(col2)))

计算相似度分数后,我过滤 DataFrame 以显示所有相似度列的分数大于 95。但是,当我运行

show()
count()
,或尝试将数据加载到 Snowflake 时,我遇到超时错误:

Python异常:

An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Python\Lib\socket.py", line 709, in readinto
    raise
TimeoutError: timed out
  • DataFrame 中只有 8 条记录。
  • 我尝试更改 Spark 会话配置,但没有解决问题。

技术和版本:

  • Python 3.11.6
  • Java 版本“1.8.0_371”
  • Java(TM) SE 运行时环境(版本 1.8.0_371-b11)
  • Java HotSpot(TM) 64 位服务器 VM(内部版本 25.371-b11,混合模式)
  • PySpark 版本 3.5.0
  • 使用 Scala 版本 2.12.18、Java HotSpot(TM) 64 位服务器 VM、1.8.0_371

问题:

  • 我使用 for 循环迭代列对并在 PySpark 中应用 UDF 的方法是否正确?
  • 为什么即使数据集这么小,show() 和 count() 操作也可能导致超时?
  • 是否有任何最佳实践或替代方法来计算和过滤 PySpark 中多个列对的相似度分数?
  • 如何调试或解决此超时问题以成功将数据加载到 Snowflake 中?
python dataframe pyspark fuzzywuzzy
1个回答
0
投票

不确定可能是什么问题。 fuzzywuzzy 有一个新家 一个新的包名称 thefuzz。

也许首先尝试读取包含数据的 csv 文件并写入文件。

如果上述工作流程有效,则表明您的问题出在 与雪花接口。

https://github.com/seatgeek/thefuzz

pip install thefuzz

以下代码适用于小数据。

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from thefuzz import fuzz


spark = SparkSession.builder.appName("example").getOrCreate()

data = [
    (1, "apple", "apple", "banana", "banana"),
    (2, "orange", "orange juice", "grape", "grapefruit"),
    (3, "berry", "blueberry", "melon", "watermelon")
]

columns = ["id", "column1", "column2", "column3", "column4"]

df = spark.createDataFrame(data, columns)

similarity_udf = F.udf(lambda x, y: fuzz.ratio(x, y), IntegerType())

meta_data = [
    ('column1', 'column2'),
    ('column3', 'column4'),
]

for col1, col2 in meta_data:
    similarity_col_name = f'{col1}_{col2}_similarity'
    df = df.withColumn(similarity_col_name, similarity_udf(F.col(col1), F.col(col2)))


df.show()

输出:

+---+-------+------------+-------+----------+--------------------------+--------------------------+
| id|column1|     column2|column3|   column4|column1_column2_similarity|column3_column4_similarity|
+---+-------+------------+-------+----------+--------------------------+--------------------------+
|  1|  apple|       apple| banana|    banana|                       100|                       100|
|  2| orange|orange juice|  grape|grapefruit|                        67|                        67|
|  3|  berry|   blueberry|  melon|watermelon|                        71|                        67|
+---+-------+------------+-------+----------+--------------------------+--------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.