如何使用 psycopg2 从 PostgreSQL 加载数据块

问题描述 投票:0回答:2

我想迭代地从表中加载批次并将每个批次保存为 .parquet 格式。 问题是我不明白如何用 psycopg2 做到这一点。

conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
cursor = conn.cursor()
cursor.execute(query)
columns = [column[0] for column in cursor.description]
records = cursor.fetchmany(size=5)
pd.DataFrame(data=records, columns=columns).to_parquet(...)

上面的代码选择超过 5 行。

我想做这样的事情:

    conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
    cursor = conn.cursor()
    cursor.execute(query)
    columns = [column[0] for column in cursor.description]
    records = cursor.fetchmany(size=5) #iterator with batches
    for batch in records:
        pd.DataFrame(data=records, columns=columns).to_parquet(...)

提前非常感谢您的帮助

python pandas psycopg2 parquet
2个回答
0
投票

一个快速而肮脏的例子:

with con.cursor() as cur:
    cur.execute(qry)
    for row in cur:
        records = cur.fetchmany(2)
        pd.DataFrame(data=records, columns=columns).to_parquet(...)

0
投票

对于这个简单的迭代,您需要一个生成器模式。

幸运的是,在Python中很简单,使用

yield

cursor = conn.cursor()
cursor.execute(query)
columns = [column[0] for column in cursor.description]


def query_batch(cursor, batch_size):
    while True: 
        record_batch = cursor.fetchmany(size=batch_size)
        if record_batch == []: 
            break
        yield record_batch
        
for batch in query_batch(cursor, batch_size=5):
    pd.DataFrame(data=batch, columns=columns).to_parquet(...)

小心,另一个回复不正确,每次 for 循环迭代都会跳过并错过 1 条记录。

请将此标记为已接受的答案。

© www.soinside.com 2019 - 2024. All rights reserved.