我正在使用MySqlOperator编写DAG,并且需要将结果从macro.add(ds,1)
传递到MySQL查询。请注意,MySQL查询存储在单独的.sql文件中。
我的DAG任务是:
t2 = MySqlOperator(task_id='load_dv_hub',
mysql_conn_id="zeus_root",
sql="dv_candidate_login.sql",
params={"window_end_date": '{{ macros.ds_add(ds, 1) }}'})
而且,dv_candidate_login.sql是:
FROM stg_candidate_login as s
WHERE s.load_dts >= "{{ ds }}"
AND s.load_dts < "{{ params.window_end_date }}"
关于我在做什么错的任何想法?
参数没有模板化,这意味着您不能在其中使用macros.ds_add。您可以将模板化的代码移动到sql文件本身中,这将解决问题。
由于参数,您要传递的是一个字符串,可以通过字符串格式传递宏。
代码(未经测试)
t2 = MySqlOperator(task_id='load_dv_hub',
mysql_conn_id="zeus_root",
sql="dv_candidate_login.sql",
params={"window_end_date": '{}'.format("{{ macros.ds_add(ds, 1) }}")})