我试图了解Airflow是否支持在DAG中跳过任务以进行临时执行?
让我们说我的DAG图如下所示:task1> task2> task3> task4
我想从task3手动启动我的DAG,最好的方法是什么?
我读过“ShortCircuitOperator”,但我正在寻找更多的临时解决方案,一旦触发执行就可以应用。
谢谢!
您可以合并ShortCircuitOperator SkipMixin的uses under the hood来跳过下游任务。
from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults
class mySkippingOperator(BaseOperator, SkipMixin)
@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition
def execute(self, context):
if self.condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
self.log.info("Done.")
从Apache Airflow的构建方式,您可以编写逻辑/分支来确定要运行的任务。
但
您无法从中间的任何任务开始执行任务。排序完全由依赖管理(upstream / downstrem)定义。
但是,如果您使用芹菜操作符,则可以忽略运行中的所有依赖项,并要求气流执行任务。然后,这不会阻止上游任务被安排。