使用 psycopg2 通知将新数据插入数据库

问题描述 投票:0回答:0

我正在学习如何使用 PostgresSQL 并尝试使用 psycopg2 获得有关任何后续插入数据库表的通知。现在我一直在使用 LISTEN/NOTIFY 功能,但它没有按照我的意图工作,因为它只会通知我是否在 psql shell 中执行这样的命令:

NOTIFY test, 'hello';

相反,我想收到像这样的声明的通知

INSERT INTO test (id, date, cases, deaths, recovered) VALUES ('99999996','9999-12-21',1,1,5);
。我正在执行的代码在这里:

collated_data =  download_csv("https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv",
             "https://raw.githubusercontent.com/datasets/covid-19/master/data/time-series-19-covid-combined.csv")

# connect to our new DB
with psycopg2.connect(
        "dbname="" user="" password="" host=""") as conn:
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

    # open a cursor to perform database operations
    with conn.cursor() as cur:
        cur.execute("""
                    CREATE TABLE IF NOT EXISTS test (
                        id serial PRIMARY KEY,
                        date date,
                        cases integer,
                        deaths integer,
                        recovered integer);
                        
                    CREATE OR REPLACE FUNCTION add_task_notify()
                    RETURNS trigger AS
                    $BODY$
                    BEGIN 
                    PERFORM pg_notify('test_item_added',
                            json_build_object(
                                'test_id', new.id,
                                'test_date', new.date,
                                'test_cases', new.cases,
                                'test_deaths', new.deaths,
                                'test_recovered', new.recovered)::text);
                    RETURN NEW;
                    END;
                    $BODY$
                    LANGUAGE plpgsql VOLATILE
                    COST 100;
                    ALTER FUNCTION add_task_notify()
                    OWNER to user;
                    
                    CREATE TRIGGER add_task_event_trigger
                    AFTER INSERT
                    ON test
                    FOR EACH ROW
                    EXECUTE PROCEDURE add_task_notify();
                    
        """)
        conn.commit()

        cur.execute("LISTEN test;")
        print("Waiting for notifications on channel 'test")

        tuples = [tuple(x) for x in collated_data.to_numpy()]

        cols = ','.join(list(collated_data.columns))
        query = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s)" % ('test', cols)

        extras.execute_batch(cur, query, tuples)

        conn.commit()

        while True:
            if select.select([conn], [], [], 5) != ([], [], []):
                conn.poll()
                while conn.notifies:
                    notify = conn.notifies.pop(0)
                    print(notify)
python postgresql psycopg2
© www.soinside.com 2019 - 2024. All rights reserved.