我有一种情况,我的操作符之一 DataprocCreateClusterOperator 永远不会触发,就好像仍然为其设置了“all_success”一样。如果这是第一个任务,它运行得很好,但我不想每次 dag 运行时都创建一个集群,因为可能不需要它。令我惊讶的是,将上游放在它上面并将其设置为“one_success”是行不通的。 我知道“one_success”规则通常有效,因为“exit_task”已设置它并且它总是运行没有问题。 这是我的代码:
check_cm_files_present_task = BranchPythonOperator(
task_id='check_cm_files_present_task',
python_callable=check_cm_files_present,
)
check_cmts_struct_files_present_task = BranchPythonOperator(
task_id='check_cmts_struct_files_present_task',
python_callable=check_cmts_struct_files_present,
)
check_cmts_meas_files_present_task = BranchPythonOperator(
task_id='check_cmts_meas_files_present_task',
python_callable=check_cmts_meas_files_present,
)
move_to_valid_cm = BashOperator(
task_id='move_to_valid_cm',
bash_command='''gsutil -m mv gs://A gs://B,
dag = dag
)
move_to_valid_cmts_struct = BashOperator(
task_id='move_to_valid_cmts_struct',
bash_command='''gsutil -m mv gs://C gs://D,
dag = dag
)
move_to_valid_cmts_meas = BashOperator(
task_id='move_to_valid_cmts_meas',
bash_command='''gsutil -m mv gs://E gs://F,
dag = dag
)
create_dataproc_cluster = dataproc_operator.DataprocCreateClusterOperator(
task_id='create_dataproc_cluster',
cluster_name=CLUSTER_WITH_DATE,
project_id=PROJECT,
cluster_config=CLUSTER_CONFIG,
region='europe-west3',
trigger_rule='one_success'
)
exit_task = DummyOperator(
task_id='exit_task',
trigger_rule='one_success'
)
check_cm_files_present_task >> move_to_valid_cm >> run_dataproc_job_cm
check_cm_files_present_task >> exit_task
check_cmts_struct_files_present_task >> move_to_valid_cmts_struct >> run_dataproc_job_cmts_struct
check_cmts_struct_files_present_task >> exit_task
check_cmts_meas_files_present_task >> move_to_valid_cmts_meas >> run_dataproc_job_cmts_meas
check_cmts_meas_files_present_task >> exit_task
[move_to_valid_cm, move_to_valid_cmts_struct, move_to_valid_cmts_meas] >> create_dataproc_cluster >> [run_dataproc_job_cm, run_dataproc_job_cmts_struct, run_dataproc_job_cmts_meas]
这个从
BranchPythonOperator
的跳跃状态级联到下游任务,直到满足 trigger_rule=none_failed_min_one_success
。
其背后的想法是,当您进行分支时,您想要跳过整个不相关的分支。然而,在某些像您这样的情况下,您想要决定何时停止级联。
Airflow docs 解释一下这个场景:
join = EmptyOperator(task_id="join", dag=dag)
如您所见,连接任务被跳过,因为上游之一是发送跳过“信号”的分支运算符,并且
join
任务是该分支的一部分。
但是 join 也是成功分支的一部分,所以如果我们将其触发规则更改为
join = EmptyOperator(task_id="join", trigger_rule="none_failed_min_one_success ", dag=dag)
Airflow 不会进一步级联跳过信号并让调度程序处理
join
任务。
在您的情况下,您需要将
one_success
替换为 none_failed_min_one_success
:
create_dataproc_cluster = dataproc_operator.DataprocCreateClusterOperator(
task_id='create_dataproc_cluster',
cluster_name=CLUSTER_WITH_DATE,
project_id=PROJECT,
cluster_config=CLUSTER_CONFIG,
region='europe-west3',
trigger_rule='none_failed_min_one_success'
)