将 EMR 上 Pyspark 作业的峰值内存利用率写入文件

问题描述 投票:0回答:1

我们在 EMR 上运行许多 Pyspark 作业。执行的管道是相同的,但输入可以极大地改变峰值内存利用率,并且该利用率随着时间的推移而增长。我想自动写出提交到 EMR 集群的每个步骤的峰值内存利用率 - 实际上,“您在使用高峰期使用了 10TB RAM”(是的,这对于我们的工作来说很常见)。我们的工作不受 CPU 或任何其他指标的限制,内存是我所关心的,不过如果写出所有聚合指标可以简化我愿意接受的方法。

如果重要的话,我们使用集群模式,以yarn作为集群管理器。我们还将这些作业作为 Docker 容器提交。

我已经尝试了以下两种方法,但我非常接受这样的想法:我想得太多了……感觉做起来并不那么困难:

spark = init_spark()

do_some_stuff()

spark.metrics.peakMemoryUtilization()

所以,如果我遗漏了什么,请教育我。

尝试 1 - 扩展 JVM 的 Spark Listener 的自定义类

摆弄 Github CoPilot 的一些建议(我知道),我被指出使用 SparkListeners 来完成这项任务,但是我在这种方法上没有取得任何成功。建议的方法似乎是创建一个使用 JVM 对任务的访问权限的自定义类。

from pyspark.sql import SparkSession
from pyspark import SparkConf
from py4j.java_gateway import java_import

class MemoryUsageListener:
    def __init__(self):
        self.peak_memory_usage = 0

    def on_task_end(self, task_end):
        metrics = task_end.taskMetrics()
        memory_used = metrics.peakExecutionMemory()
        if memory_used > self.peak_memory_usage:
            self.peak_memory_usage = memory_used

    # Getter function for adding this to the SparkContext's Listeners
    def get_listener(self, sc):
        gw = sc._gateway
        java_import(gw.jvm, "org.apache.spark.scheduler.SparkListener")
        java_import(gw.jvm, "org.apache.spark.scheduler.SparkListenerTaskEnd")

        class JavaListener(gw.jvm.SparkListener):
            def __init__(self, parent):
                super(gw.jvm.SparkListener, self).__init__()
                self.parent = parent

            def onTaskEnd(self, taskEnd):
                self.parent.on_task_end(taskEnd)

        return JavaListener(self)

    def get_peak_memory_usage(self):
        return self.peak_memory_usage

设置如下:

conf = SparkConf().setAppName("MemoryUsageTracker")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

# Create and register the listener
memory_listener = MemoryUsageListener()
java_listener = memory_listener.get_listener(sc)
sc._jsc.sc().addSparkListener(java_listener)

... do some spark stuff here (run the actual job) ...

peak_memory_usage = memory_listener.get_peak_memory_usage()
write_to_file(peak_memory_usage)

我广泛地使用了上述建议,但遇到了以下问题:

JavaClass.__init__() "takes 3 positional arguments but 4 were given"
当类初始化时,特别是它讨厌
class JavaListener(gw.jvm.SparkListener)
。我尝试过移动 super 声明,将 Java 类移动到一个完全独立的函数中,没有雪茄。这看起来很直观,但不幸的是,显然我不明白这种 OOP 方法是如何工作的。

尝试 2 - 对 Yarn 的 API 调用

好吧,所以仅仅询问 Spark 它在做什么的直观解决方案不起作用。我发现的另一个建议是使用 requests 库并点击 api 端点来获取有关执行器的详细信息。

import requests
from pyspark.sql import SparkSession
from pyspark import SparkConf

def get_executor_memory_metrics(app_id, spark_history_server_url):
    response = requests.get(f"{spark_history_server_url}/api/v1/applications/{app_id}/executors")
    if response.status_code != 200:
        raise Exception(f"Failed to fetch executor metrics, status code: {response.status_code}")
    executors = response.json()
    return executors

def calculate_peak_memory_usage(executors):
    # i'm not certain this is the correct dict access, it's what copilot generated, but it's not my current issue
    peak_memory_usage = max(executor['peakMemoryMetrics']['JVMHeapMemory'] for executor in executors if 'peakMemoryMetrics' in executor)
    return peak_memory_usage

sc = spark.sparkContext
app_id = sc.applicationId

master_url = sc.master
# I'm not convinced this logic is actually sound, but I don't have access to modify the cluster / yarn settings. 
yarn_resource_manager_host = socket.getfqdn()
yarn_port = 18080

# At the end of the job
executors = get_executor_memory_metrics(app_id, f"{yarn_resource_manager_host}:{yarn_port}")
peak_memory_usage = calculate_peak_memory_usage(executors)

显然,上面的代码将被打包在一个线程中,以循环并持续监视峰值使用情况——但是,在转到该部分之前,我无法让请求部分实际处理持续的“连接被拒绝”错误。我已经确认它正确接收了集群的 IP 和应用程序 ID 并输入了 URL,尽管我无法在这里共享它们——我不确定端口号是否正确,但我不确定是否有更好的方法在运行时访问它。

java apache-spark pyspark monitoring amazon-emr
1个回答
0
投票

这需要一些来回,但为了 500 我愿意帮忙。

您尝试过挖掘JMX信息吗?记得启用JMX并在-D参数中指定

-Dcom.sun.management.jmxremote.port=600

不要编写任何代码,只需使用一些通用的 JMX 客户端。

一旦开始审核每个职位详细信息的统计数据...不确定是否给出了这些,我将尝试在此处查看您的评论

© www.soinside.com 2019 - 2024. All rights reserved.