我想获取在 Spark 上下文中运行的 Spark 作业的 jobId。 Dataproc 是否将此信息存储在 Spark Context 中?
Dataproc 作业 ID 不能直接在 SparkContext 中使用。您可能需要调用 Dataproc Jobs API 来列出与集群关联的所有作业并找到它。
带着一点耐心,我找到了这个解决方案(我正在使用 Dataproc 图像2.0-debian10、Spark 3.1.2、Scala 2.12.14)。
简而言之,我注意到您可以按照以下步骤获取 Dataproc 作业 ID
我编写了这个隐式类,它将 ID 提取为可选值
implicit class RichSparkConf(protected val conf: SparkConf) {
def dataprocJobId: Option[String] = {
for {
yarnTags <- conf.getOption(RichSparkConf.SparkYarnTags)
dataProcJobIdTag <- yarnTags.split(",").find(_.startsWith(RichSparkConf.DataprocJobIdPrefix))
} yield dataProcJobIdTag.substring(RichSparkConf.DataprocJobIdPrefix.length)
}
}
object RichSparkConf {
val DataprocJobIdPrefix = "dataproc_job_"
val SparkYarnTags = "spark.yarn.tags"
}
根据经验,我通常会尝试查看所有 SparkConf 键值对,看看是否可以从中得到符合我要求的东西。 希望有帮助;)
如果有人感兴趣,Python 中的以下函数将捕获 dataproc jobId:
import pyspark
sc = pyspark.SparkContext()
def extract_jobid(sc):
# Access the underlying SparkConf
spark_conf = sc.getConf()
# Get the value of spark.yarn.tags configuration
spark_conf = spark_conf.get("spark.yarn.tags")
# Extract the jobId from yarn_tags using string processing
# assuming yarn_tags format: "dataproc_job_<job_id>"
job_id = None
if yarn_tags:
tags = yarn_tags.split(",")
for tag in tags:
if (tag.startswith("dataproc_job_") and not tag.startswith("dataproc_job_attempt_timestamp_")):
job_id = tag.split("_")[2]
break
return job_id
# Simply call to the function to output the dataproc jobId
extract_jobid(sc)