我需要创建一个 dag,其中包含一些应每天执行的任务,以及一个应每月 1 日执行的任务:
task_1:应该每天执行 task_2:应每月执行一次,每月 1 日 task_3:应该每天执行
所以基本上我的 dag 应该是一个简单的任务序列,其中如果execution_date 不是本月 1 日,则应以某种方式跳过 task_2,或者是具有某些分支的 dag,应始终执行 task_1 和 task_3,并且仅当execution_date 时才执行 task_2是这个月的 1 号。
我已经尝试过短路装饰器,以及使用Python函数的BranchPythonOperator,该函数根据日期逻辑返回任务,但是我似乎无法做到这一点。如何使用 TaskFlow API 完成此任务? 谢谢
@dag(
start_date=datetime(2024,9,9),
schedule_interval=None
)
def my_dag():
task_1 = EmptyOperator(task_id='task_1')
task_2 = EmptyOperator(task_id='task_2')
task_3 = EmptyOperator(task_id='task_3')
def monthly_branch(**kwargs):
execution_date = kwargs['execution_date']
if execution_date.day == 1:
return 'task_2'
elif execution_date.day > 1:
return 'task_3'
else:
return None
branch = BranchPythonOperator(
task_id = "branch",
python_callable = monthly_branch
)
task_1 >> branch
branch >> task_2 >> task_3
branch >> task_3
my_dag()
这里我假设如果task_2运行,那么task_3不应该运行。
@dag(
start_date=datetime(2024, 9, 9),
schedule_interval=None
)
def my_dag():
task_1 = EmptyOperator(task_id='task_1')
task_2 = EmptyOperator(task_id='task_2')
task_3 = EmptyOperator(task_id='task_3')
@task.branch(task_id="branch")
def monthly_branch(**context):
execution_date = context['execution_date']
if execution_date.day == 1:
return "task_2"
elif execution_date.day > 1:
return "task_3"
else:
return None
task_1 >> monthly_branch() >> (task_2, task_3)
my_dag()