自 Apache Airflow 1.9 发布以来,我一直在使用自己的自定义 SqlSensor,因为我无法使用在 Google BigQuery 上运行的标准 SQL 语句中包含的 SqlSensor,因为默认设置是 使用旧版 SQL。
我检查了最近的1.10.3版本,情况似乎仍然如此。除了使用我自己的 SQL 传感器作为插件之外,还有其他方法可以完成这项工作吗?
我找到的最快的解决方案是
_get_hook
方法use_legacy_sql=False
from airflow.sensors.sql_sensor import SqlSensor
class BigQuerySqlSensor(SqlSensor):
def _get_hook(self):
hook = super()._get_hook()
hook.use_legacy_sql = False
return hook
sense_stuff = BigQuerySqlSensor(
dag=dag,
task_id='sense_stuff',
conn_id='the_connection_id',
sql="SELECT COUNT(*) FROM some_table",
mode='reschedule',
poke_interval=600,
timeout=(3600)
)
更新您的自定义传感器以将
use_legacy_sql=False
传递到 BigQueryHook。
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to,
use_legacy_sql=False
)
从文档中的启用标准 SQL 主题来看,如果您无法直接设置选项,另一种方法是使用
#standardSQL
shebang,例如:
#standardSQL
SELECT x
FROM UNNEST([1, 2, 3]) AS x;
应该可以使用此前缀提交查询来覆盖设置。
您也可以将其传递为
hook_param
wait = SqlSensor(
task_id="wait_task",
conn_id="bigquery",
sql="SELECT 1",
fail_on_empty=False,
poke_interval=60, # 1 minute
mode="reschedule",
timeout=60 * 117,
hook_params={
"use_legacy_sql": False,
"location": "us",
"api_resource_configs": {"query": {"useQueryCache": False}},
},
)