如何从 Spark 作业中检索通过 Dataproc 提交的作业的 jobId

问题描述 投票:0回答:3

我想获取在 Spark 上下文中运行的 Spark 作业的 jobId。 Dataproc 是否将此信息存储在 Spark Context 中?

apache-spark spark-streaming google-cloud-dataproc dataproc
3个回答
1
投票

Dataproc 作业 ID 不能直接在 SparkContext 中使用。您可能需要调用 Dataproc Jobs API 来列出与集群关联的所有作业并找到它。


1
投票

带着一点耐心,我找到了这个解决方案(我正在使用 Dataproc 图像2.0-debian10、Spark 3.1.2、Scala 2.12.14)。

简而言之,我注意到您可以按照以下步骤获取 Dataproc 作业 ID

  1. SparkContext 访问底层 SparkConf
  2. 获取 spark.yarn.tags 配置的值
  3. 通过一些字符串处理从该值中提取 id

我编写了这个隐式类,它将 ID 提取为可选值

implicit class RichSparkConf(protected val conf: SparkConf) {

  def dataprocJobId: Option[String] = {

    for {
      yarnTags <- conf.getOption(RichSparkConf.SparkYarnTags)
      dataProcJobIdTag <- yarnTags.split(",").find(_.startsWith(RichSparkConf.DataprocJobIdPrefix))
    } yield dataProcJobIdTag.substring(RichSparkConf.DataprocJobIdPrefix.length)
  }
}

object RichSparkConf {

  val DataprocJobIdPrefix = "dataproc_job_"
  val SparkYarnTags = "spark.yarn.tags"
}

根据经验,我通常会尝试查看所有 SparkConf 键值对,看看是否可以从中得到符合我要求的东西。 希望有帮助;)


0
投票

如果有人感兴趣,Python 中的以下函数将捕获 dataproc jobId:

import pyspark
sc = pyspark.SparkContext()

def extract_jobid(sc):
    # Access the underlying SparkConf
    spark_conf = sc.getConf()

    # Get the value of spark.yarn.tags configuration
    spark_conf = spark_conf.get("spark.yarn.tags")

    # Extract the jobId from yarn_tags using string processing
    # assuming yarn_tags format: "dataproc_job_<job_id>"
    job_id = None
    if yarn_tags:
        tags = yarn_tags.split(",")
        for tag in tags:
            if (tag.startswith("dataproc_job_") and not tag.startswith("dataproc_job_attempt_timestamp_")):
                job_id = tag.split("_")[2]
                break
    return job_id 

 # Simply call to the function to output the dataproc jobId
 extract_jobid(sc)
© www.soinside.com 2019 - 2024. All rights reserved.