触发规则“one_success”不适用于“DataprocCreateClusterOperator”

问题描述 投票:0回答:1

我有一种情况,我的操作符之一 DataprocCreateClusterOperator 永远不会触发,就好像仍然为其设置了“all_success”一样。如果这是第一个任务,它运行得很好,但我不想每次 dag 运行时都创建一个集群,因为可能不需要它。令我惊讶的是,将上游放在它上面并将其设置为“one_success”是行不通的。 我知道“one_success”规则通常有效,因为“exit_task”已设置它并且它总是运行没有问题。 enter image description here 这是我的代码:

    check_cm_files_present_task = BranchPythonOperator(
    task_id='check_cm_files_present_task',
    python_callable=check_cm_files_present,
)

check_cmts_struct_files_present_task = BranchPythonOperator(
    task_id='check_cmts_struct_files_present_task',
    python_callable=check_cmts_struct_files_present,
)

check_cmts_meas_files_present_task = BranchPythonOperator(
    task_id='check_cmts_meas_files_present_task',
    python_callable=check_cmts_meas_files_present,
)
move_to_valid_cm = BashOperator(
    task_id='move_to_valid_cm',
    bash_command='''gsutil -m mv gs://A gs://B,
    dag = dag
    )
move_to_valid_cmts_struct = BashOperator(
    task_id='move_to_valid_cmts_struct',
    bash_command='''gsutil -m mv gs://C gs://D,
    dag = dag
    )
move_to_valid_cmts_meas = BashOperator(
    task_id='move_to_valid_cmts_meas',
    bash_command='''gsutil -m mv gs://E gs://F,
    dag = dag
    )

create_dataproc_cluster = dataproc_operator.DataprocCreateClusterOperator(
    task_id='create_dataproc_cluster',
    cluster_name=CLUSTER_WITH_DATE,
    project_id=PROJECT,
    cluster_config=CLUSTER_CONFIG,
    region='europe-west3',
    trigger_rule='one_success'
)

exit_task = DummyOperator(
    task_id='exit_task',
    trigger_rule='one_success'
)

check_cm_files_present_task >> move_to_valid_cm >> run_dataproc_job_cm
check_cm_files_present_task >> exit_task

check_cmts_struct_files_present_task >> move_to_valid_cmts_struct >> run_dataproc_job_cmts_struct
check_cmts_struct_files_present_task >> exit_task

check_cmts_meas_files_present_task >> move_to_valid_cmts_meas >> run_dataproc_job_cmts_meas
check_cmts_meas_files_present_task >> exit_task

[move_to_valid_cm, move_to_valid_cmts_struct, move_to_valid_cmts_meas] >> create_dataproc_cluster >> [run_dataproc_job_cm, run_dataproc_job_cmts_struct, run_dataproc_job_cmts_meas]
airflow google-cloud-dataproc
1个回答
0
投票

这个从

BranchPythonOperator
的跳跃状态级联到下游任务,直到满足
trigger_rule=none_failed_min_one_success

其背后的想法是,当您进行分支时,您想要跳过整个不相关的分支。然而,在某些像您这样的情况下,您想要决定何时停止级联。

Airflow docs 解释一下这个场景:

join = EmptyOperator(task_id="join", dag=dag)

如您所见,连接任务被跳过,因为上游之一是发送跳过“信号”的分支运算符,并且

join
任务是该分支的一部分。

enter image description here

但是 join 也是成功分支的一部分,所以如果我们将其触发规则更改为

join = EmptyOperator(task_id="join", trigger_rule="none_failed_min_one_success ", dag=dag)

Airflow 不会进一步级联跳过信号并让调度程序处理

join
任务。

enter image description here

在您的情况下,您需要将

one_success
替换为
none_failed_min_one_success

create_dataproc_cluster = dataproc_operator.DataprocCreateClusterOperator(
    task_id='create_dataproc_cluster',
    cluster_name=CLUSTER_WITH_DATE,
    project_id=PROJECT,
    cluster_config=CLUSTER_CONFIG,
    region='europe-west3',
    trigger_rule='none_failed_min_one_success'
)
© www.soinside.com 2019 - 2024. All rights reserved.