如何跳过Airflow上的任务?

问题描述 投票:1回答:2

我试图了解Airflow是否支持在DAG中跳过任务以进行临时执行?

让我们说我的DAG图如下所示:task1> task2> task3> task4

我想从task3手动启动我的DAG,最好的方法是什么?

我读过“ShortCircuitOperator”,但我正在寻找更多的临时解决方案,一旦触发执行就可以应用。

谢谢!

airflow directed-acyclic-graphs airflow-scheduler
2个回答
0
投票

您可以合并ShortCircuitOperator SkipMixinuses under the hood来跳过下游任务。

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)

    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition

    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)

        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")

0
投票

从Apache Airflow的构建方式,您可以编写逻辑/分支来确定要运行的任务。

您无法从中间的任何任务开始执行任务。排序完全由依赖管理(upstream / downstrem)定义。

但是,如果您使用芹菜操作符,则可以忽略运行中的所有依赖项,并要求气流执行任务。然后,这不会阻止上游任务被安排。

© www.soinside.com 2019 - 2024. All rights reserved.