空气气流中,运行时动态任务

问题描述 投票:13回答:2

关于“动态任务的其他问题似乎解决在计划或设计时DAG的动态构建。我感兴趣的是在执行过程中动态地添加任务到DAG。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG('test_dag', description='a test',
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 1, 1),
          catchup=False)

def make_tasks():
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1 >> du2 >> du3

p = PythonOperator(
    task_id='python_operator',
    dag=dag,
    python_callable=make_tasks)

这种幼稚的做法似乎并没有工作 - 虚拟任务从来没有在UI中显示。

什么是执行过程中的新运营商加入到DAG的正确方法是什么?可能吗?

python airflow airflow-scheduler
2个回答
4
投票

它也不可能修改其执行过程中DAG(没有更多的工作)。

dag = DAG(...是由调度循环回升。这将有任务实例'python_operator'在里面。这任务实例被安排在DAG运行,并通过一个工人或执行人执行。由于在气流DB DAG模型仅由调度器更新了这些追加的虚拟任务将无法坚持到DAG也没有计划运行。当工人离开,他们会被人遗忘。除非你复制从关于坚持和更新模型调度所有的代码...但是,将在下一次撤消的调度访问解析的DAG文件,该文件可能会发生一分钟一次,一旦秒或更快,这取决于有多少其他DAG文件有解析。

气流实际上想要每个DAG大约停留运行之间相同的布局。它还希望重新加载/解析不断DAG文件。所以,虽然你可以做一个DAG文件,该文件在每次运行时动态地确定基于一些外部数据(最好是在一个文件或PYC模块缓存的任务,而不是网络I /像一个DB查询O,你会整个调度循环减慢所有的DAG)这不是一个很好的计划,你的图形和树视图将得到所有混乱,而你的调度解析将对通过查找更加征税。

你可以做的可调用运行的每个任务...

def make_tasks(context):
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1.execute(context)
    du2.execute(context)
    du3.execute(context)

p = PythonOperator(
    provides_context=true,

但是,这是连续的,你必须找出如何使用Python,使它们相互平行(利用期货?),如果任何引发异常的整个任务失败。此外,它被绑定到一个遗嘱执行人或工作者,让不使用气流的任务分配(kubernetes,mesos,芹菜)。

另一种方式与这个工作是在运行时添加任务的固定数量(最大数量),并使用赎回(S)短路不需要的任务或推动与XCOM参数为他们每个人,改变他们的行为但不改变DAG。


1
投票

关于你的代码示例,你永远不打电话给你的功能,注册您的任务在你的DAG。

有一种动态的任务,你可以有一个单一的运营商根据一些国家的不同的东西,或者你可以拥有它可以根据状态被跳过,具有ShortCircuitOperator运营商屈指可数其中一样。

© www.soinside.com 2019 - 2024. All rights reserved.