我在使用 BigQueryOperator 的 Airflow DAG 任务中遇到问题。该任务涉及执行存储在“/home/airflow/gcs/dags/movies/sql/RRR/”中的 SQL 文件 (cast.sql),并且我使用 BigQueryOperator 中的 params 参数将参数传递到 SQL 文件。
sql_path = "/home/airflow/gcs/dags/movies/sql/RRR/cast.sql"
stage_latest_parameters = {
"project_id": project_id,
"dataset_name": dataset_name,
"date": raw_latest_date,
"source_table": source,
"dest_table": dest
}
sql_with_parameters_task = BigQueryOperator(
task_id='sql_with_parameters_task',
sql=sql_path,
params=stage_latest_parameters,
use_legacy_sql=False,
dag=dag
)
错误:
Exception rendering Jinja template for task 'sql_with_parameters_task', field 'sql'.
Template: '/home/airflow/gcs/dags/movies/sql/RRR/cast.sql'
...
jinja2.exceptions.TemplateNotFound: /home/airflow/gcs/dags/movies/sql/RRR/cast.sql
查询:
CREATE OR REPLACE TABLE `{project_id}.{dataset_name}.{dest_table}`
AS
SELECT *, {date} AS date
FROM `{project_id}.{dataset_name}.{source_table}`
在这里,我不需要使用任何模板。我只需要通过传递 GCS 存储桶内的必要参数来运行查询。我做错了什么?感谢您的帮助。
您必须使用DAG类的
template_searchpath
参数。将此参数指定为 SQL 脚本的父文件夹的路径。喜欢:
with DAG(
dag_id = 'example_dag',
template_searchpath = '/home/airflow/gcs/dags/movies/sql/RRR/'
) as dag:
您还可以提供一个列表而不是单个路径。之后,您可以在
BigQueryOperator
或BigQueryInsertJobOperator
中使用SQL脚本的名称。