我想跟踪群集中所有节点上的作业/任务/阶段的全局故障率。当前的想法是解析历史服务器编写的HDFS中的日志文件并获取此数据,但这似乎很麻烦。有没有更好的方法?理想情况下,我可以在每个作业提交的客户端访问此信息,但事实并非如此。推荐的解决方法是什么?
一个想法是扩展SparkListener并围绕故障将度量收集到您想要的任何位置(例如,将事件推送到ELK。
一些有用的事件:
case class SparkListenerExecutorBlacklisted(
time: Long,
executorId: String,
taskFailures: Int)
extends SparkListenerEvent
case class SparkListenerExecutorBlacklistedForStage(
time: Long,
executorId: String,
taskFailures: Int,
stageId: Int,
stageAttemptId: Int)
extends SparkListenerEvent
case class SparkListenerNodeBlacklistedForStage(
time: Long,
hostId: String,
executorFailures: Int,
stageId: Int,
stageAttemptId: Int)
extends SparkListenerEvent
case class SparkListenerNodeBlacklisted(
time: Long,
hostId: String,
executorFailures: Int)
extends SparkListenerEvent
和听众:
def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted): Unit
def onExecutorBlacklistedForStage(executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage): Unit
def onNodeBlacklistedForStage(nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage): Unit
def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit
[请注意,您可以通过Spark上下文的addSparkListener
订阅侦听器。此其他堆栈溢出线程中的更多详细信息:How to implement custom job listener/tracker in Spark?
注意:要使其与PySpark一起使用,请遵循另一个Stack Overflow线程中描述的步骤:How to add a SparkListener from pySpark in Python?