我知道如何使用Variable动态运行DAG的任务,它可以很好地工作,直到你为同一个DAG触发多次运行。
即,一旦在data / to / load / dir下创建了一个带有文件的新目录,我会在某处编写一个脚本,触发airflow variables -set dir data/to/load/$newDir
,然后是airflow trigger_dag dyn_test
。现在让我们说目录“a”和“b”在data / to / load /下创建(在相似的时间),这将使airflow variable + airflow trigger_dag
调用两次,在变量集调用上有两个不同的输入(一个后缀为'a',另一个带有'b'ofcourse)。我看到两个Jobs在气流GUI中运行DAG,但问题是他们都在考虑相同的目录值,a或b。这绝对意味着它需要最后的'气流变量设置'调用。我该如何解决?触发多次运行的方法是什么,每次运行采用不同的值(在dir变量中)以动态循环。我的Dag看起来像这样:
# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")
args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}
dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)
filesInDir = next(os.walk(dir))[2]
for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'
task1 >> task2
您的问题中描述的场景是先进先出队列适合的情况,假设您希望保持当前显式设置要作为单独序列处理的目录的方式。
也就是说,Airflow CLI trigger_dags
命令允许传递--conf
标志来设置DagRun
中传递的配置字典,我将按照你所描述的那样设置变量的位置,触发dag。
http://airflow.apache.org/cli.html#trigger_dag
以下是代码中的内容。
airflow trigger_dag dyn_test --conf '{"me_seeks.dir": "data/to/load/$newDir"}'
您将在用于任务的气流操作员中设置provide_context
kwargs
。
可以在上下文中检索DagRun的实例,并在检索到的配置中设置dir
值
假设您使用Airflow PythonOperator
定义了您的任务;然后你在dir
中检索python_callable
的代码看起来类似于:
def me_seeks(dag_run=None):
dir = dag_run.conf['me_seeks.dir']