Stack Overflow 社区您好,
我正在 GCP Cloud Composer 上运行 Airflow(版本 2.5.3)DAG,其中有几个任务将触发基于 java 的数据流作业。任务的代码如下所示:
trigger_dataflow = BeamRunJavaPipelineOperator(
task_id=f"template_task_id",
dag=dag,
task_concurrency=1,
depends_on_past=True,
runner="DataflowRunner",
jar=jar_path,
job_class=jar_class,
pipeline_options={
'task': 'template_task',
'sql': "template_sql",
'bigtableInstanceId': bt_instance_id,
**({"network": dataflow_network} if dataflow_network is not None else {}),
**({"subnetwork": dataflow_subnetwork} if dataflow_subnetwork is not None else {}),
},
dataflow_config=DataflowConfiguration(
job_name=f'template_job_nmae',
poll_sleep=60,
project_id=dataflow_project_id,
service_account=dataflow_service_account,
),
params={
"tableId": f"template_table"
}
)
任务不断失败,返回代码为Negsignal.SIGKILL。经过一番调查,我意识到这段代码表明工作人员缺乏资源,导致任务被强制终止。
我进一步查看了Airflow任务日志,发现该任务不断检查数据流作业的状态(每1秒),我怀疑向数据流端发出的请求过多导致了该问题。
我尝试使用 dataflow_config 中的字段 poll_sleep 将每个数据流状态轮询之间的睡眠时间增加到 60,但是当我重新运行 Airflow 任务时,它仍然每 1 秒检查一次数据流。
有人可以让我知道我的方法有什么问题吗?或者有什么建议的方法可以这样做吗?提前非常感谢!