我的用例涉及获取项目中存在的所有流数据流作业的作业 ID 并取消它。更新我的数据流作业的源并重新运行它。
我正在尝试使用 python 来实现这一点。直到现在我还没有遇到任何有用的文档。 我想到使用 python 的库子进程来执行 gcloud 命令作为解决方法。但我再次无法存储结果并使用它。
有人可以指导我什么是最好的方法吗?
除了直接使用其余 API 之外,您还可以在 google-api-python-client 中使用为 API 生成的 Python 绑定。对于简单的调用,它不会增加那么多的价值,但是当传递许多参数时,它比原始 HTTP 库更容易使用。
使用该库,作业列表调用将如下所示
from googleapiclient.discovery import build
import google.auth
credentials, project_id = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
df_service = build('dataflow', 'v1b3', credentials=credentials)
response = df_service.projects().locations().jobs().list(
project_id=project_id,
location='<region>').execute()
您可以像这样直接使用Dataflow Rest api
from google.auth.transport.requests import AuthorizedSession
import google.auth
base_url = 'https://dataflow.googleapis.com/v1b3/projects/'
credentials, project_id = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
project_id = 'PROJECT_ID'
location = 'europe-west1'
authed_session = AuthorizedSession(credentials)
response = authed_session.request('GET', f'{base_url}{project_id}/locations/{location}/jobs')
print(response.json())
您必须导入 google-auth 依赖项。
您还可以添加查询参数
?filter=ACTIVE
以仅获取与您的流作业匹配的活动数据流。
这是您拥有的完整三位一体选项:
安装客户端库:
pip install google-cloud-dataflow-client
然后
from google.cloud.dataflow_v1beta3 import JobsV1Beta3Client, ListJobsRequest
client = JobsV1Beta3Client()
request = ListJobsRequest(
project_id="<PROJECT_ID>",
location="<LOCATION>", # won't work without location
filter="ACTIVE", # remove if you also want finished
)
jobs = client.list_jobs(request=request)
for job in jobs:
print(job)
看起来您只能找到过去 30 天的工作。