我是气流新手,并试图了解任务何时运行。我不明白为什么task_3a在运行这个例子时立即运行。
如何使此示例 dag 按此顺序运行:
import logging
from airflow.decorators import task, dag, task_group
from airflow.utils.dates import days_ago
@dag(
dag_id='taskflow_conditional_dag',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
)
def my_dag():
logger = logging.getLogger("airflow.task")
@task
def task_1():
logger.info("Task 1 running")
return "run_task_"
@task.branch
def branching_task(data):
if data == "run_task_2":
return "task_2"
return "task_3"
@task
def task_2():
# Task 2 logic here
logger.info("Task 2 running")
pass
@task(
trigger_rule="none_failed"
)
def task_3():
logger.info("Task 3 running")
pass
@task(
trigger_rule="none_failed"
)
def task_4():
logger.info("Task 4 running")
pass
@task_group()
def task_after_3_before_4_group():
@task
def task_3a():
logger.info("Task 3a running.")
pass
@task
def task_3b():
logger.info("Task 3b running.")
pass
return task_3a() >> task_3b()
@task(
trigger_rule="none_failed"
)
def task_5():
logger.info("Task 5 running")
pass
data = task_1()
decision = branching_task(data)
task_2_result = task_2()
task_3_result = task_3()
task_4_result = task_4()
task_5_results = task_5()
data >> decision
decision >> task_2_result >> task_3_result >> task_after_3_before_4_group() >> task_4_result >> task_5_results
dag = my_dag()
我在airflow github问题部分发现了以下问题,概述了这个问题:https://github.com/apache/airflow/issues/40196
更新任务组后:
@task_group()
def task_after_3_before_4_group():
@task
def task_3a():
logger.info("Task 3a running.")
pass
@task
def task_3b():
logger.info("Task 3b running.")
pass
return task_3a() >> task_3b()
删除返回值:
@task_group()
def task_after_3_before_4_group():
@task
def task_3a():
logger.info("Task 3a running.")
pass
@task
def task_3b():
logger.info("Task 3b running.")
pass
task_3a() >> task_3b()
一切都很好。