我想做一个父DAG和几个子DAG,这些子DAG会通过 SubDagOperator
.
如果我在设置SubDAGOperator任务时只输入子Dag的Dag名称。
task_1 = SubDagOperator(
task_id="task_1",
subdag=child_dag_name,
dag=parent_dag
)
我得到以下错误信息:
NameError: name 'child_dag_name' is not defined
这个答案同样依赖于Python的知识和Airflow的知识。
回顾
python
: import
ing一个模块意味着所有顶层(缩进0)的东西都会立即执行(在导入过程中)。airflow
仅指那些 DAG
对象 是由 scheduler
webserver
在dag定义文件顶层(缩进0)发生的事情牢记以上2点,你可以这么做
child_dag.py
文件,并返回一个 DAG
儿戏SubDagOperator
任务dag_object_builder.py。
from typing import Dict, Any
from airflow.models import DAG
def create_dag_object(dag_id: str, dag_params: Dict[str, Any]) -> DAG:
dag: DAG = DAG(dag_id=dag_id, **dag_params)
return dag
child_dag.py
from datetime import datetime
from typing import Dict, Any
from airflow.models import DAG
from src.main.subdag_example import dag_object_builder
default_args: Dict[str, Any] = {
"owner": "my_owner",
"email": ["my_username@my_domain.com"],
"weight_rule": "downstream",
"retries": 1
}
...
def create_child_dag_object(dag_id: str) -> DAG:
my_dag: DAG = dag_object_builder.create_dag_object(
dag_id=dag_id,
dag_params={
"start_date": datetime(year=2019, month=7, day=10, hour=21, minute=30),
"schedule_interval": None,
"max_active_runs": 1,
"default_view": "graph",
"catchup": False,
"default_args": default_args
}
)
return my_dag
my_child_dag: DAG = create_child_dag_object(dag_id="my_child_dag")
parent_dag.py
from datetime import datetime
from typing import Dict, Any
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from src.main.subdag_example import child_dag
from src.main.subdag_example import dag_object_builder
default_args: Dict[str, Any] = {
"owner": "my_owner",
"email": ["my_username@my_domain.com"],
"weight_rule": "downstream",
"retries": 1
}
my_parent_dag: DAG = dag_object_builder.create_dag_object(
dag_id="my_parent_dag",
dag_params={
"start_date": datetime(year=2019, month=7, day=10, hour=21, minute=30),
"schedule_interval": None,
"max_active_runs": 1,
"default_view": "graph",
"catchup": False,
"default_args": default_args
}
)
...
my_subdag_task: SubDagOperator = SubDagOperator(
task_id="my_subdag_task",
dag=my_parent_dag,
subdag=child_dag.create_child_dag_object(dag_id="my_subdag")
)