根据条件执行任务

问题描述 投票:0回答:1

我需要创建一个 dag,其中包含一些应每天执行的任务,以及一个应每月 1 日执行的任务:

task_1:应该每天执行 task_2:应每月执行一次,每月 1 日 task_3:应该每天执行

所以基本上我的 dag 应该是一个简单的任务序列,其中如果execution_date 不是本月 1 日,则应以某种方式跳过 task_2,或者是具有某些分支的 dag,应始终执行 task_1 和 task_3,并且仅当execution_date 时才执行 task_2是这个月的 1 号。

我已经尝试过短路装饰器,以及使用Python函数的BranchPythonOperator,该函数根据日期逻辑返回任务,但是我似乎无法做到这一点。如何使用 TaskFlow API 完成此任务? 谢谢

@dag(
    start_date=datetime(2024,9,9),
    schedule_interval=None
)
def my_dag():

    task_1 = EmptyOperator(task_id='task_1')
    task_2 = EmptyOperator(task_id='task_2')
    task_3 = EmptyOperator(task_id='task_3')


    def monthly_branch(**kwargs):
        execution_date = kwargs['execution_date']
        if execution_date.day == 1:
            return 'task_2'
        elif execution_date.day > 1:
            return 'task_3'
        else:
            return None

    branch = BranchPythonOperator(
        task_id = "branch",
        python_callable = monthly_branch 
    ) 


    task_1 >> branch
    branch >> task_2 >> task_3
    branch >> task_3 

my_dag()
python airflow
1个回答
0
投票

这里我假设如果task_2运行,那么task_3不应该运行。

@dag(
    start_date=datetime(2024, 9, 9),
    schedule_interval=None
)
def my_dag():
    task_1 = EmptyOperator(task_id='task_1')
    task_2 = EmptyOperator(task_id='task_2')
    task_3 = EmptyOperator(task_id='task_3')

    @task.branch(task_id="branch")
    def monthly_branch(**context):
        execution_date = context['execution_date']
        if execution_date.day == 1:
            return "task_2"
        elif execution_date.day > 1:
            return "task_3"
        else:
            return None

    task_1 >> monthly_branch() >> (task_2, task_3)


my_dag()
© www.soinside.com 2019 - 2024. All rights reserved.