如何使用Airflow变量动态地为DAG运行多个Job

问题描述 投票:1回答:1

我知道如何使用Variable动态运行DAG的任务,它可以很好地工作,直到你为同一个DAG触发多次运行。

即,一旦在data / to / load / dir下创建了一个带有文件的新目录,我会在某处编写一个脚本,触发airflow variables -set dir data/to/load/$newDir,然后是airflow trigger_dag dyn_test。现在让我们说目录“a”和“b”在data / to / load /下创建(在相似的时间),这将使airflow variable + airflow trigger_dag调用两次,在变量集调用上有两个不同的输入(一个后缀为'a',另一个带有'b'ofcourse)。我看到两个Jobs在气流GUI中运行DAG,但问题是他们都在考虑相同的目录值,a或b。这绝对意味着它需要最后的'气流变量设置'调用。我该如何解决?触发多次运行的方法是什么,每次运行采用不同的值(在dir变量中)以动态循环。我的Dag看起来像这样:

# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")


args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2
python airflow
1个回答
1
投票

您的问题中描述的场景是先进先出队列适合的情况,假设您希望保持当前显式设置要作为单独序列处理的目录的方式。

也就是说,Airflow CLI trigger_dags命令允许传递--conf标志来设置DagRun中传递的配置字典,我将按照你所描述的那样设置变量的位置,触发dag。

http://airflow.apache.org/cli.html#trigger_dag

以下是代码中的内容。

airflow trigger_dag dyn_test --conf '{"me_seeks.dir": "data/to/load/$newDir"}'

您将在用于任务的气流操作员中设置provide_context kwargs

可以在上下文中检索DagRun的实例,并在检索到的配置中设置dir

假设您使用Airflow PythonOperator定义了您的任务;然后你在dir中检索python_callable的代码看起来类似于:

def me_seeks(dag_run=None):
    dir = dag_run.conf['me_seeks.dir']
© www.soinside.com 2019 - 2024. All rights reserved.