如何将 use_legacy_sql=False 传递到具有 BigQuery 连接的 Airflow DAG 中的 SqlSensor?

问题描述 投票:0回答:4

自 Apache Airflow 1.9 发布以来,我一直在使用自己的自定义 SqlSensor,因为我无法使用在 Google BigQuery 上运行的标准 SQL 语句中包含的 SqlSensor,因为默认设置是 使用旧版 SQL

我检查了最近的1.10.3版本,情况似乎仍然如此。除了使用我自己的 SQL 传感器作为插件之外,还有其他方法可以完成这项工作吗?

google-bigquery airflow
4个回答
2
投票

我找到的最快的解决方案是

  • 定义一个新类 BigQuerySqlSensor
  • 重写
    _get_hook
    方法
  • 在覆盖中设置
    use_legacy_sql=False
  • 返回更新后的钩子
from airflow.sensors.sql_sensor import SqlSensor

class BigQuerySqlSensor(SqlSensor):
    def _get_hook(self):
        hook = super()._get_hook()
        hook.use_legacy_sql = False
        return hook

sense_stuff = BigQuerySqlSensor(
        dag=dag,
        task_id='sense_stuff',
        conn_id='the_connection_id',
        sql="SELECT COUNT(*) FROM some_table",
        mode='reschedule',
        poke_interval=600,
        timeout=(3600)
    )


1
投票

更新您的自定义传感器以将

use_legacy_sql=False
传递到 BigQueryHook。

hook = BigQueryHook(
            bigquery_conn_id=self.bigquery_conn_id,
            delegate_to=self.delegate_to,
            use_legacy_sql=False
       )

0
投票

从文档中的启用标准 SQL 主题来看,如果您无法直接设置选项,另一种方法是使用

#standardSQL
shebang,例如:

#standardSQL
SELECT x
FROM UNNEST([1, 2, 3]) AS x;

应该可以使用此前缀提交查询来覆盖设置。


0
投票

您也可以将其传递为

hook_param

wait = SqlSensor(
    task_id="wait_task",
    conn_id="bigquery",
    sql="SELECT 1",
    fail_on_empty=False,
    poke_interval=60,  # 1 minute
    mode="reschedule",
    timeout=60 * 117,
    hook_params={
        "use_legacy_sql": False,
        "location": "us",
        "api_resource_configs": {"query": {"useQueryCache": False}},
    },
)
© www.soinside.com 2019 - 2024. All rights reserved.