如何从另一个dag.py文件中定义Airflow中的子dag taks?

问题描述 投票:0回答:1

我想做一个父DAG和几个子DAG,这些子DAG会通过 SubDagOperator.

  • 我只能找到如何在SubDagOperator任务中动态创建Subdag的例子。
  • 然而,在这种情况下,我想要在DAG.py文件中定义的独立子DAG,并将这些子DAG拼接在父DAG中。

如果我在设置SubDAGOperator任务时只输入子Dag的Dag名称。

task_1 = SubDagOperator(
    task_id="task_1",
    subdag=child_dag_name,
    dag=parent_dag
)

我得到以下错误信息:

NameError: name 'child_dag_name' is not defined
python airflow airflow-scheduler airflow-operator
1个回答
0
投票

这个答案同样依赖于Python的知识和Airflow的知识。

回顾

  • python: importing一个模块意味着所有顶层(缩进0)的东西都会立即执行(在导入过程中)。
  • airflow仅指那些 DAG 对象 是由 scheduler webserver 在dag定义文件顶层(缩进0)发生的事情

牢记以上2点,你可以这么做

  • 开创 辅助工具功能 在你 child_dag.py 文件,并返回一个 DAG 儿戏
  • 使用该帮助函数来实例化顶层子DAG,以及创建 SubDagOperator 任务

dag_object_builder.py。

from typing import Dict, Any

from airflow.models import DAG


def create_dag_object(dag_id: str, dag_params: Dict[str, Any]) -> DAG:
    dag: DAG = DAG(dag_id=dag_id, **dag_params)
    return dag

child_dag.py

from datetime import datetime
from typing import Dict, Any

from airflow.models import DAG

from src.main.subdag_example import dag_object_builder

default_args: Dict[str, Any] = {
    "owner": "my_owner",
    "email": ["my_username@my_domain.com"],
    "weight_rule": "downstream",
    "retries": 1
}

...


def create_child_dag_object(dag_id: str) -> DAG:
    my_dag: DAG = dag_object_builder.create_dag_object(
        dag_id=dag_id,
        dag_params={
            "start_date": datetime(year=2019, month=7, day=10, hour=21, minute=30),
            "schedule_interval": None,
            "max_active_runs": 1,
            "default_view": "graph",
            "catchup": False,
            "default_args": default_args
        }
    )
    return my_dag


my_child_dag: DAG = create_child_dag_object(dag_id="my_child_dag")

parent_dag.py

from datetime import datetime
from typing import Dict, Any

from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator

from src.main.subdag_example import child_dag
from src.main.subdag_example import dag_object_builder

default_args: Dict[str, Any] = {
    "owner": "my_owner",
    "email": ["my_username@my_domain.com"],
    "weight_rule": "downstream",
    "retries": 1
}

my_parent_dag: DAG = dag_object_builder.create_dag_object(
    dag_id="my_parent_dag",
    dag_params={
        "start_date": datetime(year=2019, month=7, day=10, hour=21, minute=30),
        "schedule_interval": None,
        "max_active_runs": 1,
        "default_view": "graph",
        "catchup": False,
        "default_args": default_args
    }
)

...

my_subdag_task: SubDagOperator = SubDagOperator(
    task_id="my_subdag_task",
    dag=my_parent_dag,
    subdag=child_dag.create_child_dag_object(dag_id="my_subdag")
)

  • 如果您的目的是将DAG连接在一起,而且您没有任何特殊的要求,必须使用 SubDagOperator那么,我建议使用 TriggerDagRunOperator 反而 SubDag的有其 扰民份额.
  • 在这里阅读更多信息。为顶层DAG布线
© www.soinside.com 2019 - 2024. All rights reserved.