如何在 PySpark 作业中检索 Dataproc 的 jobId

问题描述 投票:0回答:2

我运行了多个批处理作业,我想将 dataproc 中的 jobId 引用到保存的输出文件。

这将允许将参数和输出的所有日志与结果相关联。缺点仍然是:随着 YARN 中的执行器消失,无法再获取单个执行器的日志。

google-cloud-dataproc
2个回答
7
投票

Google dataproc 的上下文通过使用标签传递到 Spark 作业中。因此,所有合适的信息都存在于 SparkConfig 中并且可以访问:

pyspark.SparkConf().get("spark.yarn.application.tags", "unknown")
pyspark.SparkConf().get("spark.yarn.tags", "unknown")

输出如下:

dataproc_job_3f4025a0-bce1-a254-9ddc-518a4d8b2f3d

然后可以将该信息分配到我们的导出文件夹,并使用 Dataproc 参考保存输出:

df.select("*").write. \
    format('com.databricks.spark.csv').options(header='true') \
    .save(export_folder)

0
投票

如果有人感兴趣的话,这是Python的答案:

import pyspark
sc = pyspark.SparkContext()

def extract_jobid(sc):
    # Access the underlying SparkConf
    spark_conf = sc.getConf()

    # Get the value of spark.yarn.tags configuration
    spark_conf = spark_conf.get("spark.yarn.tags")

    # Extract the jobId from yarn_tags using string processing
    # assuming yarn_tags format: "dataproc_job_<job_id>"
    job_id = None
    if yarn_tags:
        tags = yarn_tags.split(",")
        for tag in tags:
            if (tag.startswith("dataproc_job_") and not tag.startswith("dataproc_job_attempt_timestamp_")):
                job_id = tag.split("_")[2]
                break
    return job_id 

 # Simply call to the function to output the dataproc jobId
 extract_jobid(sc)
© www.soinside.com 2019 - 2024. All rights reserved.