Google Cloud Dataflow Python,检索作业ID

问题描述 投票:1回答:3

我目前正在使用Python中的数据流模板,我想访问作业ID并使用它来保存到特定的Firestore文档。

是否可以访问作业ID?

我在文档中找不到任何相关内容。

python google-cloud-platform google-cloud-dataflow
3个回答
3
投票

您可以通过从管道中调用dataflow.projects().locations().jobs().list来实现此目的(请参阅下面的完整代码)。一种可能性是始终使用相同的作业名称调用模板,这是有意义的,否则作业前缀可以作为运行时参数传递。使用正则表达式解析作业列表以查看作业是否包含名称前缀,如果是,则返回作业ID。如果有多个,它将​​只返回最新的一个(当前正在运行的那个)。

在定义PROJECTBUCKET变量之后,模板被暂存,具有:

python script.py \
    --runner DataflowRunner \
    --project $PROJECT \
    --staging_location gs://$BUCKET/staging \
    --temp_location gs://$BUCKET/temp \
    --template_location gs://$BUCKET/templates/retrieve_job_id

然后,在执行模板化作业时指定所需的作业名称(在我的例子中为myjobprefix):

gcloud dataflow jobs run myjobprefix \
   --gcs-location gs://$BUCKET/templates/retrieve_job_id

retrieve_job_id函数将从作业中返回作业ID,更改job_prefix以匹配给定的名称。

import argparse, logging, re
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def retrieve_job_id(element):
  project = 'PROJECT_ID'
  job_prefix = "myjobprefix"
  location = 'us-central1'

  logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))

  try:
    credentials = GoogleCredentials.get_application_default()
    dataflow = build('dataflow', 'v1b3', credentials=credentials)

    result = dataflow.projects().locations().jobs().list(
      projectId=project,
      location=location,
    ).execute()

    job_id = "none"

    for job in result['jobs']:
      if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
        job_id = job['id']
        break

    logging.info("Job ID: {}".format(job_id))
    return job_id

  except Exception as e:
    logging.info("Error retrieving Job ID")
    raise KeyError(e)


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  p = beam.Pipeline(options=pipeline_options)

  init_data = (p
               | 'Start' >> beam.Create(["Init pipeline"])
               | 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))

  p.run()


if __name__ == '__main__':
  run()

2
投票

您可以使用Google Dataflow API。使用projects.jobs.list方法检索数据流作业ID。


0
投票

从浏览文档开始,您应该从启动作业中获得的响应应该包含一个json主体,其中包含属性“job”,它是Job的一个实例。

您应该能够使用它来获取您需要的ID。

如果您使用google cloud sdk进行数据流,那么当您在different object上调用create方法时,可能会获得templates()

© www.soinside.com 2019 - 2024. All rights reserved.