如何将数据写入Redshift,这是在Python中创建的数据帧的结果?

问题描述 投票:12回答:6

我有一个Python数据框。我可以将此数据作为新表写入Redshift吗?我已经成功创建了与Redshift的数据库连接,并且能够执行简单的SQL查询。现在我需要写一个数据帧。

python pandas dataframe amazon-redshift psycopg2
6个回答
32
投票

您可以使用to_sql将数据推送到Redshift数据库。我已经能够通过SQLAlchemy引擎连接到我的数据库。一定要在你的index = False电话中设置to_sql。如果表不存在,将创建该表,并且您可以指定是否要调用替换表,附加到表,或者如果表已存在则失败。

from sqlalchemy import create_engine
import pandas as pd

conn = create_engine('postgresql://username:[email protected]:5439/yourdatabase')

df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])

df.to_sql('your_table', conn, index=False, if_exists='replace')

请注意,您可能需要pip install psycopg2才能通过SQLAlchemy连接到Redshift。

to_sql Documentation


9
投票
import pandas_redshift as pr

pr.connect_to_redshift(dbname = <dbname>,
                        host = <host>,
                        port = <port>,
                        user = <user>,
                        password = <password>)

pr.connect_to_s3(aws_access_key_id = <aws_access_key_id>,
                aws_secret_access_key = <aws_secret_access_key>,
                bucket = <bucket>,
                subdirectory = <subdirectory>)

# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame = data_frame,
                        redshift_table_name = 'gawronski.nba_shots_log')

详细信息:https://github.com/agawronski/pandas_redshift


4
投票

假设您可以访问S3,这种方法应该有效:

步骤1:将DataFrame作为csv写入S3(我使用AWS SDK boto3) 第2步:您从DataFrame了解Redshift表的列,数据类型和键/索引,因此您应该能够生成create table脚本并将其推送到Redshift以创建一个空表 步骤3:从Python环境向Redshift发送copy命令,将S3中的数据复制到步骤2中创建的空表中

每次都像魅力一样。

第4步:在您的云存储人员开始大喊大叫之前,从S3删除csv

如果你看到自己多次这样做,将函数中的所有四个步骤包装起来就会保持整洁。


4
投票

我尝试使用pandas df.to_sql(),但速度非常慢。插入50行我花了10多分钟。见this公开问题(撰写时)

我尝试使用火焰生态系统中的odo(根据问题讨论中的建议),但面对的是​​ProgrammingError,我没有费心去调查。

最后有效的:

import psycopg2

# Fill in the blanks for the conn object
conn = psycopg2.connect(user = 'user',
                              password = 'password',
                              host = 'host',
                              dbname = 'db',
                              port = 666)
cursor = conn.cursor()

args_str = b','.join(cursor.mogrify("(%s,%s,...)", x) for x in tuple(map(tuple,np_data)))
cursor.execute("insert into table (a,b,...) VALUES "+args_str.decode("utf-8"))

cursor.close()
conn.commit()
conn.close()

是的,平原老psycopg2。这是一个numpy阵列,但从df转换为ndarray不应该太困难。这给了我大约3k行/分钟。

但是,根据其他团队成员的建议,最快的解决方案是在将数据帧作为TSV / CSV转储到S3集群然后复制之后使用COPY命令。如果您要复制非常庞大的数据集,则应该对此进行调查。 (如果我尝试的话,我会在这里更新)


2
投票

我曾经依靠pandas to_sql()功能,但它太慢了。我最近改用了以下内容:

import pandas as pd
import s3fs # great module which allows you to read/write to s3 easily
import sqlalchemy

df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])

s3 = s3fs.S3FileSystem(anon=False)
filename = 'my_s3_bucket_name/file.csv'
with s3.open(filename, 'w') as f:
    df.to_csv(f, index=False, header=False)

con = sqlalchemy.create_engine('postgresql://username:[email protected]:5439/yourdatabase')
# make sure the schema for mytable exists

# if you need to delete the table but not the schema leave DELETE mytable
# if you want to only append, I think just removing the DELETE mytable would work

con.execute("""
    DELETE mytable;
    COPY mytable
    from 's3://%s'
    iam_role 'arn:aws:iam::xxxx:role/role_name'
    csv;""" % filename)

该角色必须允许对S3进行红移访问,请参阅here以获取更多详细信息

我发现对于一个300KB的文件(12000x2数据帧),这需要4秒,相比之下,我获得了pandas to_sql()函数的8分钟


0
投票

出于本次对话的目的,Postgres = RedShift您有两种选择:

选项1:

来自熊猫:http://pandas.pydata.org/pandas-docs/stable/io.html#io-sql

pandas.io.sql模块提供了一组查询包装器,以便于数据检索和减少对特定于DB的API的依赖性。 SQLAlchemy提供了数据库抽象(如果已安装)。此外,您还需要一个数据库驱动程序库。这种驱动程序的例子是PostgreSQL的psycopg2或MySQL的pymysql。

编写DataFrames

假设DataFrame数据中包含以下数据,我们可以使用to_sql()将其插入数据库。

id  Date    Col_1   Col_2   Col_3
26  2012-10-18  X   25.7    True
42  2012-10-19  Y   -12.4   False
63  2012-10-20  Z   5.73    True

In [437]: data.to_sql('data', engine)

对于某些数据库,写入大型DataFrame可能会因超出数据包大小限制而导致错误。通过在调用to_sql时设置chunksize参数可以避免这种情况。例如,以下将数据一次批量写入数据库1000行:

In [438]: data.to_sql('data_chunked', engine, chunksize=1000)

选项2

或者你可以简单地做你自己的如果你有一个名为data的数据框,只需使用iterrows循环它:

for row in data.iterrows():

然后将每行添加到数据库中。我会为每一行使用copy而不是insert,因为它会更快。

http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from

© www.soinside.com 2019 - 2024. All rights reserved.