我正在使用 Airflow SQLExecuteQueryOperator 在 postgres 数据库上执行 pl/pgsql 脚本。在我的 pl/pgsql 脚本中,有 RAISE NOTICE 命令来打印信息。这些消息不会打印在气流日志中。有没有办法让Airflow日志打印sql执行消息?
重现示例:
达格.py
SQLExecuteQueryOperator(
task_id = 't1',
conn_id = 'conn1',
sql = 't1.sql'
)
t1.sql
DO $$
BEGIN
RAISE NOTICE 'hello';
END $$;
Airflow
PostgresOperator
使用 psycopg2 二进制文件。
因此,您必须在执行查询后读取连接的通知属性。
将处理程序选项添加到您的
PostgresOperator
,并在执行查询后从连接对象打印通知。
import logging
from airflow.decorators import dag
from airflow.providers.postgres.operators.postgres import PostgresOperator
import pendulum
import textwrap
logger = logging.getLogger(__name__)
def log_notices(cur, **kwargs):
for notice in cur.connection.notices:
logger.info(notice)
@dag(
default_args=default_args,
schedule=None,
start_date=pendulum.from_format("2024-09-23", "YYYY-MM-DD"),
catchup=False
)
def example_dag():
postgres_operator_1 = PostgresOperator(
sql=textwrap.dedent("""\
DO $$
BEGIN
RAISE NOTICE 'hello';
END $$;"""),
handler=log_notices,
postgres_conn_id="exampledb",
task_id="postgres_operator_1",
)