Airflow SQLExecuteQueryOperator 如何在日志中打印消息

问题描述 投票:0回答:1

我正在使用 Airflow SQLExecuteQueryOperator 在 postgres 数据库上执行 pl/pgsql 脚本。在我的 pl/pgsql 脚本中,有 RAISE NOTICE 命令来打印信息。这些消息不会打印在气流日志中。有没有办法让Airflow日志打印sql执行消息?

重现示例:

达格.py

SQLExecuteQueryOperator(
   task_id = 't1',
   conn_id = 'conn1',
   sql = 't1.sql'
)

t1.sql

DO $$ 
BEGIN 
    RAISE NOTICE 'hello'; 
END $$;
postgresql airflow
1个回答
0
投票

Airflow

PostgresOperator
使用 psycopg2 二进制文件

因此,您必须在执行查询后读取连接的通知属性。

将处理程序选项添加到您的

PostgresOperator
,并在执行查询后从连接对象打印通知。

import logging


from airflow.decorators import dag
from airflow.providers.postgres.operators.postgres import PostgresOperator
import pendulum
import textwrap


logger = logging.getLogger(__name__)


def log_notices(cur, **kwargs):
    for notice in cur.connection.notices:
        logger.info(notice)

@dag(
    default_args=default_args,
    schedule=None,
    start_date=pendulum.from_format("2024-09-23", "YYYY-MM-DD"),
    catchup=False
)
def example_dag():
    postgres_operator_1 = PostgresOperator(
        sql=textwrap.dedent("""\
            DO $$ 
            BEGIN 
            RAISE NOTICE 'hello'; 
            END $$;"""),
        handler=log_notices,
        postgres_conn_id="exampledb",
        task_id="postgres_operator_1",
    )
© www.soinside.com 2019 - 2024. All rights reserved.