Airflow:发布动态创建的dag

问题描述 投票:0回答:1

我希望能够从我的代码中发布和触发DAG对象,该代码不受调度程序的控制(即$ AIRFLOW_HOME / dags文件夹)

我的最后一招是以编程方式创建一个包含我要发布的DAG定义的py文件,并将此文件保存到$ AIRFLOW_HOME / dags文件夹中。我相信它应该比那更容易。

以下是我尝试过的。

import airflow
from airflow import DAG
from datetime import timedelta

from airflow.models import DagPickle
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.db import provide_session


@provide_session
def submit_dag(session=None):
    args = {
        'owner': 'airflow',
        'start_date': airflow.utils.dates.days_ago(2)
    }

    dag = DAG(
        dag_id='sample', default_args=args,
        schedule_interval=None, start_date=airflow.utils.dates.days_ago(2),
        dagrun_timeout=timedelta(minutes=60))
    task = DummyOperator(task_id='one', dag=dag)
    dag_pickle = DagPickle(task)
    session.add(dag_pickle)
    session.commit()


submit_dag()

上面的代码确实在dag_pickle表中创建了条目但是如何发布并稍后触发这个dag?

apache airflow
1个回答
0
投票

我可以做pickle.dump(dag,open(DAGS_FOLDER / pickled_dags,'wb'))并在DAGS FOLDER中有一个pickle.load文件(DAGS_FOLDER / pickled_dags)

© www.soinside.com 2019 - 2024. All rights reserved.