为什么PySpark任务花费太多时间?

问题描述 投票:-3回答:1

我正在运行一个Pyspark程序,该程序可以正常运行。该过程的第一步是将特定的UDF应用于数据框。这是功能:

import html2text

class Udfs(object):
    def __init__(self):
        self.h2t = html2text.HTML2Text()
        self.h2t.ignore_links = True
        self.h2t.ignore_images = True

    def extract_text(self, raw_text):
        try:
            texto = self.h2t.handle(raw_text)
        except:
            texto = "PARSE HTML ERROR"
        return texto

这是我应用UDF的方式:

import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs

udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))

它处理大约2900万行和300GB。问题在于某些任务需要太多时间来处理。任务的平均时间为:

average times

其他任务的完成时间超过1小时。

但是有些任务需要太多时间来处理:

task time

该过程在具有100个节点的群集中的EMR中运行,每个节点具有32gb的RAM和4个CPU。同时启用了火花推测。

这些任务在哪里?UDF有问题吗?这是线程问题吗?

python apache-spark pyspark bigdata user-defined-functions
1个回答
0
投票

我的直觉是您使用的分区过多。我将尝试通过显着减少其数量来进行尝试。

如果您的分区是平衡的,则按分区平均有29 millions /80k partitions = 362个观测值。我想这还不够。您将花费大量时间安排任务而不是执行任务。

如果您没有均衡的分区,情况会变得更糟(请参阅here。通常会产生瓶颈,这在您的情况下似乎会发生。

您可以使用spark.sql.shuffle.partitionsspark.default.parallelism更改有关分区的默认值。您也可以将数据coalesce分配到较少的分区数

根据我的经验猜测。很难找到足够的分区数量,但这是值得的。让我知道它是否有所帮助或您是否仍然遇到瓶颈。

© www.soinside.com 2019 - 2024. All rights reserved.