我正在尝试构建一个函数,将大块数据帧加载到 PostgreSQL 表中。分块等不是这个问题的一部分,所以我没有将其包含在最小的示例中。这个问题只关注函数
copy_chunk
,或者更准确地说,关注如何使用cur.copy
。受到this答案和psycopg文档的启发,我尝试了这个功能的运气:
import pandas as pd
import psycopg
from io import StringIO
def copy_chunk(
conn, # noqa: ANN001
df_chunk: pd.DataFrame,
table_name: str,
) -> None:
"""Upload a single chunk to the database using the COPY command."""
with conn.cursor() as cur:
# Create a buffer
buffer = StringIO()
df_chunk.to_csv(buffer, index=False, header=False)
buffer.seek(0)
# Load data into the table using copy method
with cur.copy(f'COPY "{table_name}" FROM STDIN WITH (FORMAT CSV)') as copy:
copy.write(buffer)
conn.commit()
# Example usage
conn_string = "postgresql://username:password@hostname:port/dbname"
df_chunk = pd.DataFrame({
'col1': [1, 2, 3],
'col2': ['A', 'B', 'C']
})
# Establish connection
with psycopg.connect(conn_string) as conn:
copy_chunk(conn, df_chunk, 'your_table_name')
我当前的问题是它不会引发任何异常,但也不会填满我的表格。你们能发现我的错误吗?
以下内容对我有用:
import pandas as pd
import psycopg
from io import StringIO
def copy_chunk(
conn, # noqa: ANN001
df_chunk: pd.DataFrame,
table_name: str,
) -> None:
"""Upload a single chunk to the database using the COPY command."""
with conn.cursor() as cur:
# Create a buffer
buffer = StringIO()
df_chunk.to_csv(buffer, index=False, header=False)
buffer.seek(0)
# Load data into the table using copy method
with buffer as f:
with cur.copy(f'COPY "{table_name}" FROM STDIN WITH (FORMAT CSV)') as copy:
while data := f.read(10):
copy.write(data)
conn.commit()
# Example usage
conn_string = "postgresql://postgres:@localhost:5432/test"
df_chunk = pd.DataFrame({
'col1': [1, 2, 3],
'col2': ['A', 'B', 'C']
})
# Establish connection
with psycopg.connect(conn_string) as conn:
copy_chunk(conn, df_chunk, 'pandas_test')
结果是:
select * from pandas_test ;
col1 | col2
------+------
1 | A
2 | B
3 | C
(3 rows)