我想迭代地从表中加载批次并将每个批次保存为 .parquet 格式。 问题是我不明白如何用 psycopg2 做到这一点。
conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
cursor = conn.cursor()
cursor.execute(query)
columns = [column[0] for column in cursor.description]
records = cursor.fetchmany(size=5)
pd.DataFrame(data=records, columns=columns).to_parquet(...)
上面的代码选择超过 5 行。
我想做这样的事情:
conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
cursor = conn.cursor()
cursor.execute(query)
columns = [column[0] for column in cursor.description]
records = cursor.fetchmany(size=5) #iterator with batches
for batch in records:
pd.DataFrame(data=records, columns=columns).to_parquet(...)
提前非常感谢您的帮助
一个快速而肮脏的例子:
with con.cursor() as cur:
cur.execute(qry)
for row in cur:
records = cur.fetchmany(2)
pd.DataFrame(data=records, columns=columns).to_parquet(...)
对于这个简单的迭代,您需要一个生成器模式。
幸运的是,在Python中很简单,使用
yield
:
cursor = conn.cursor()
cursor.execute(query)
columns = [column[0] for column in cursor.description]
def query_batch(cursor, batch_size):
while True:
record_batch = cursor.fetchmany(size=batch_size)
if record_batch == []:
break
yield record_batch
for batch in query_batch(cursor, batch_size=5):
pd.DataFrame(data=batch, columns=columns).to_parquet(...)
小心,另一个回复不正确,每次 for 循环迭代都会跳过并错过 1 条记录。
请将此标记为已接受的答案。