Airflow 任务组执行顺序

问题描述 投票:0回答:1

我是气流新手,并试图了解任务何时运行。我不明白为什么task_3a在运行这个例子时立即运行。

如何使此示例 dag 按此顺序运行:

  • 任务1
  • 任务 2(如果指示运行)
  • 任务3
  • 任务 3a 和任务 3b(如果它们并行运行就很好)
  • 任务4
  • 任务5
import logging
from airflow.decorators import task, dag, task_group
from airflow.utils.dates import days_ago

@dag(
    dag_id='taskflow_conditional_dag',
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False,
)
def my_dag():

    logger = logging.getLogger("airflow.task")

    @task
    def task_1():
        logger.info("Task 1 running")
        return "run_task_"

    @task.branch
    def branching_task(data):
        if data == "run_task_2":
            return "task_2"
        return "task_3"

    @task
    def task_2():
        # Task 2 logic here
        logger.info("Task 2 running")
        pass

    @task(
        trigger_rule="none_failed"
    )
    def task_3():
        logger.info("Task 3 running")
        pass

    @task(
        trigger_rule="none_failed"
    )
    def task_4():
        logger.info("Task 4 running")
        pass

    @task_group()
    def task_after_3_before_4_group():

        @task
        def task_3a():
            logger.info("Task 3a running.")
            pass

        @task
        def task_3b():
            logger.info("Task 3b running.")
            pass

        return task_3a() >> task_3b()


    @task(
        trigger_rule="none_failed"
    )
    def task_5():
        logger.info("Task 5 running")
        pass

    data = task_1()
    decision = branching_task(data)

    task_2_result = task_2()
    task_3_result = task_3()
    task_4_result = task_4()
    task_5_results = task_5()

    data >> decision
    decision >> task_2_result >> task_3_result >> task_after_3_before_4_group() >> task_4_result >> task_5_results

dag = my_dag()
python python-3.x airflow
1个回答
0
投票

我在airflow github问题部分发现了以下问题,概述了这个问题:https://github.com/apache/airflow/issues/40196

更新任务组后:

    @task_group()
    def task_after_3_before_4_group():

        @task
        def task_3a():
            logger.info("Task 3a running.")
            pass

        @task
        def task_3b():
            logger.info("Task 3b running.")
            pass

        return task_3a() >> task_3b()

删除返回值:


    @task_group()
    def task_after_3_before_4_group():

        @task
        def task_3a():
            logger.info("Task 3a running.")
            pass

        @task
        def task_3b():
            logger.info("Task 3b running.")
            pass

       task_3a() >> task_3b()

一切都很好。

© www.soinside.com 2019 - 2024. All rights reserved.