我正在运行一个Pyspark程序,该程序可以正常运行。该过程的第一步是将特定的UDF应用于数据框。这是功能:
import html2text
class Udfs(object):
def __init__(self):
self.h2t = html2text.HTML2Text()
self.h2t.ignore_links = True
self.h2t.ignore_images = True
def extract_text(self, raw_text):
try:
texto = self.h2t.handle(raw_text)
except:
texto = "PARSE HTML ERROR"
return texto
这是我应用UDF的方式:
import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs
udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))
它处理大约2900万行和300GB。问题在于某些任务需要太多时间来处理。任务的平均时间为:
其他任务的完成时间超过1小时。
但是有些任务需要太多时间来处理:
该过程在具有100个节点的群集中的EMR中运行,每个节点具有32gb的RAM和4个CPU。同时启用了火花推测。
这些任务在哪里?UDF有问题吗?这是线程问题吗?
我的直觉是您使用的分区过多。我将尝试通过显着减少其数量来进行尝试。
如果您的分区是平衡的,则按分区平均有29 millions /80k partitions = 362
个观测值。我想这还不够。您将花费大量时间安排任务而不是执行任务。
如果您没有均衡的分区,情况会变得更糟(请参阅here。通常会产生瓶颈,这在您的情况下似乎会发生。
您可以使用spark.sql.shuffle.partitions
和spark.default.parallelism
更改有关分区的默认值。您也可以将数据coalesce
分配到较少的分区数
根据我的经验猜测。很难找到足够的分区数量,但这是值得的。让我知道它是否有所帮助或您是否仍然遇到瓶颈。