我们在 EMR 上运行许多 Pyspark 作业。执行的管道是相同的,但输入可以极大地改变峰值内存利用率,并且该利用率随着时间的推移而增长。我想自动写出提交到 EMR 集群的每个步骤的峰值内存利用率 - 实际上,“您在使用高峰期使用了 10TB RAM”(是的,这对于我们的工作来说很常见)。我们的工作不受 CPU 或任何其他指标的限制,内存是我所关心的,不过如果写出所有聚合指标可以简化我愿意接受的方法。
如果重要的话,我们使用集群模式,以yarn作为集群管理器。我们还将这些作业作为 Docker 容器提交。
我已经尝试了以下两种方法,但我非常接受这样的想法:我想得太多了……感觉做起来并不那么困难:
spark = init_spark()
do_some_stuff()
spark.metrics.peakMemoryUtilization()
所以,如果我遗漏了什么,请教育我。
尝试 1 - 扩展 JVM 的 Spark Listener 的自定义类
摆弄 Github CoPilot 的一些建议(我知道),我被指出使用 SparkListeners 来完成这项任务,但是我在这种方法上没有取得任何成功。建议的方法似乎是创建一个使用 JVM 对任务的访问权限的自定义类。
from pyspark.sql import SparkSession
from pyspark import SparkConf
from py4j.java_gateway import java_import
class MemoryUsageListener:
def __init__(self):
self.peak_memory_usage = 0
def on_task_end(self, task_end):
metrics = task_end.taskMetrics()
memory_used = metrics.peakExecutionMemory()
if memory_used > self.peak_memory_usage:
self.peak_memory_usage = memory_used
# Getter function for adding this to the SparkContext's Listeners
def get_listener(self, sc):
gw = sc._gateway
java_import(gw.jvm, "org.apache.spark.scheduler.SparkListener")
java_import(gw.jvm, "org.apache.spark.scheduler.SparkListenerTaskEnd")
class JavaListener(gw.jvm.SparkListener):
def __init__(self, parent):
super(gw.jvm.SparkListener, self).__init__()
self.parent = parent
def onTaskEnd(self, taskEnd):
self.parent.on_task_end(taskEnd)
return JavaListener(self)
def get_peak_memory_usage(self):
return self.peak_memory_usage
设置如下:
conf = SparkConf().setAppName("MemoryUsageTracker")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
# Create and register the listener
memory_listener = MemoryUsageListener()
java_listener = memory_listener.get_listener(sc)
sc._jsc.sc().addSparkListener(java_listener)
... do some spark stuff here (run the actual job) ...
peak_memory_usage = memory_listener.get_peak_memory_usage()
write_to_file(peak_memory_usage)
我广泛地使用了上述建议,但遇到了以下问题:
JavaClass.__init__() "takes 3 positional arguments but 4 were given"
当类初始化时,特别是它讨厌class JavaListener(gw.jvm.SparkListener)
。我尝试过移动 super 声明,将 Java 类移动到一个完全独立的函数中,没有雪茄。这看起来很直观,但不幸的是,显然我不明白这种 OOP 方法是如何工作的。
尝试 2 - 对 Yarn 的 API 调用
好吧,所以仅仅询问 Spark 它在做什么的直观解决方案不起作用。我发现的另一个建议是使用 requests 库并点击 api 端点来获取有关执行器的详细信息。
import requests
from pyspark.sql import SparkSession
from pyspark import SparkConf
def get_executor_memory_metrics(app_id, spark_history_server_url):
response = requests.get(f"{spark_history_server_url}/api/v1/applications/{app_id}/executors")
if response.status_code != 200:
raise Exception(f"Failed to fetch executor metrics, status code: {response.status_code}")
executors = response.json()
return executors
def calculate_peak_memory_usage(executors):
# i'm not certain this is the correct dict access, it's what copilot generated, but it's not my current issue
peak_memory_usage = max(executor['peakMemoryMetrics']['JVMHeapMemory'] for executor in executors if 'peakMemoryMetrics' in executor)
return peak_memory_usage
sc = spark.sparkContext
app_id = sc.applicationId
master_url = sc.master
# I'm not convinced this logic is actually sound, but I don't have access to modify the cluster / yarn settings.
yarn_resource_manager_host = socket.getfqdn()
yarn_port = 18080
# At the end of the job
executors = get_executor_memory_metrics(app_id, f"{yarn_resource_manager_host}:{yarn_port}")
peak_memory_usage = calculate_peak_memory_usage(executors)
显然,上面的代码将被打包在一个线程中,以循环并持续监视峰值使用情况——但是,在转到该部分之前,我无法让请求部分实际处理持续的“连接被拒绝”错误。我已经确认它正确接收了集群的 IP 和应用程序 ID 并输入了 URL,尽管我无法在这里共享它们——我不确定端口号是否正确,但我不确定是否有更好的方法在运行时访问它。
这需要一些来回,但为了 500 我愿意帮忙。
您尝试过挖掘JMX信息吗?记得启用JMX并在-D参数中指定
-Dcom.sun.management.jmxremote.port=600
不要编写任何代码,只需使用一些通用的 JMX 客户端。
一旦开始审核每个职位详细信息的统计数据...不确定是否给出了这些,我将尝试在此处查看您的评论