使用 BigQueryOperator 和包含模板字段的 SQL 文件渲染 Airflow 任务的 Jinja 模板时出错

问题描述 投票:0回答:1

我在使用 BigQueryOperator 的 Airflow DAG 任务中遇到问题。该任务涉及执行存储在“/home/airflow/gcs/dags/movies/sql/RRR/”中的 SQL 文件 (cast.sql),并且我使用 BigQueryOperator 中的 params 参数将参数传递到 SQL 文件。

sql_path = "/home/airflow/gcs/dags/movies/sql/RRR/cast.sql"

stage_latest_parameters = {
    "project_id": project_id,
    "dataset_name": dataset_name,
    "date": raw_latest_date,
    "source_table": source,
    "dest_table": dest
}

sql_with_parameters_task = BigQueryOperator(
    task_id='sql_with_parameters_task',
    sql=sql_path,
    params=stage_latest_parameters,
    use_legacy_sql=False,
    dag=dag
)

错误:

 Exception rendering Jinja template for task 'sql_with_parameters_task', field 'sql'. 
 Template: '/home/airflow/gcs/dags/movies/sql/RRR/cast.sql'
  ...
 jinja2.exceptions.TemplateNotFound: /home/airflow/gcs/dags/movies/sql/RRR/cast.sql

查询:

  CREATE OR REPLACE TABLE `{project_id}.{dataset_name}.{dest_table}` 
  AS
  SELECT *, {date} AS date
  FROM `{project_id}.{dataset_name}.{source_table}`

在这里,我不需要使用任何模板。我只需要通过传递 GCS 存储桶内的必要参数来运行查询。我做错了什么?感谢您的帮助。

airflow google-cloud-composer airflow-2.x google-cloud-storage
1个回答
0
投票

您必须使用DAG类的

template_searchpath
参数。将此参数指定为 SQL 脚本的父文件夹的路径。喜欢:

with DAG(
    dag_id = 'example_dag',
    template_searchpath = '/home/airflow/gcs/dags/movies/sql/RRR/'
) as dag:

您还可以提供一个列表而不是单个路径。之后,您可以在

BigQueryOperator
BigQueryInsertJobOperator
中使用SQL脚本的名称。

© www.soinside.com 2019 - 2024. All rights reserved.