Airflow是否有一种方法可以从(Python)操作员中跳过当前任务?例如:
def execute():
if condition:
skip_current_task()
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
跳过下游任务不适合我(此答案中建议的解决方案:How to skip tasks on Airflow?)以及分支。是否有一种方法可以从操作员内部将其状态标记为skipped
?
跳过任务的最简单方法:
def execute():
if condition:
return
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
不幸的是,它将任务标记为DONE
想通了!跳过任务很简单:
def execute():
if condition:
raise AirflowSkipException
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)