DAG在气流界面上不可见

问题描述 投票:0回答:1

这是我的dag文件,在 dags 文件夹。

Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from work_file import Test


class Main(Test):
    def __init__(self):
        super(Test, self).__init__()

    def create_dag(self):
        default_args = {
            "owner": "airflow",
            "depends_on_past": False,
            "start_date": datetime(2015, 6, 1),
            "email": ["[email protected]"],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": timedelta(minutes=5),
            # 'queue': 'bash_queue',
            # 'pool': 'backfill',
            # 'priority_weight': 10,
            # 'end_date': datetime(2016, 1, 1),
        }
        dag = DAG("python_dag", default_args=default_args, schedule_interval='0 * * * *')

        dummy_task = DummyOperator(task_id='dummy_task', retries=3)

        python_task = PythonOperator(task_id='python_task', python_callable=self.my_func)

        dummy_task >> python_task


if __name__ == "__main__":
    a = Main()
    a.create_dag()

这是我的另一个文件 work_file.py 同为 dags 文件夹。

class Test:
    def __init__(self):
        pass

    def my_func(self):
        return "Hello"

宗旨目的是调用 my_func 从我的dag文件中。

问题:- UI上似乎没有错误,但我的dag-------。python_dag 是不可见的。

我的服务器,调度程序也在运行,我试过重启同样的服务器,但没有任何反应。

我也导入了文件(from work_file import Test)

先谢谢你!

airflow directed-acyclic-graphs airflow-scheduler
1个回答
2
投票

DAG存在多个问题。

  1. 运算符没有分配给任何DAG。添加 dag=dag 到构造者。如:, DummyOperator(..., dag=dag).
  2. create_dag() 不返回DAG。添加 return dag.
  3. DAG脚本不作为顶层代码执行。也就是说,模块 __name__ 从来没有 '__main__'. 移除 if __name__ == "__main__":.
  4. DAG对象必须在模块的全局命名空间中。将DAG对象的返回值分配给 create_dag() 变成一个变量。dag = a.create_dag().
© www.soinside.com 2019 - 2024. All rights reserved.