我有一个Python数据框。我可以将此数据作为新表写入Redshift吗?我已经成功创建了与Redshift的数据库连接,并且能够执行简单的SQL查询。现在我需要写一个数据帧。
您可以使用to_sql
将数据推送到Redshift数据库。我已经能够通过SQLAlchemy引擎连接到我的数据库。一定要在你的index = False
电话中设置to_sql
。如果表不存在,将创建该表,并且您可以指定是否要调用替换表,附加到表,或者如果表已存在则失败。
from sqlalchemy import create_engine
import pandas as pd
conn = create_engine('postgresql://username:[email protected]:5439/yourdatabase')
df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
df.to_sql('your_table', conn, index=False, if_exists='replace')
请注意,您可能需要pip install psycopg2
才能通过SQLAlchemy连接到Redshift。
import pandas_redshift as pr
pr.connect_to_redshift(dbname = <dbname>,
host = <host>,
port = <port>,
user = <user>,
password = <password>)
pr.connect_to_s3(aws_access_key_id = <aws_access_key_id>,
aws_secret_access_key = <aws_secret_access_key>,
bucket = <bucket>,
subdirectory = <subdirectory>)
# Write the DataFrame to S3 and then to redshift
pr.pandas_to_redshift(data_frame = data_frame,
redshift_table_name = 'gawronski.nba_shots_log')
假设您可以访问S3,这种方法应该有效:
步骤1:将DataFrame作为csv写入S3(我使用AWS SDK boto3)
第2步:您从DataFrame了解Redshift表的列,数据类型和键/索引,因此您应该能够生成create table
脚本并将其推送到Redshift以创建一个空表
步骤3:从Python环境向Redshift发送copy
命令,将S3中的数据复制到步骤2中创建的空表中
每次都像魅力一样。
第4步:在您的云存储人员开始大喊大叫之前,从S3删除csv
如果你看到自己多次这样做,将函数中的所有四个步骤包装起来就会保持整洁。
我尝试使用pandas df.to_sql()
,但速度非常慢。插入50行我花了10多分钟。见this公开问题(撰写时)
我尝试使用火焰生态系统中的odo
(根据问题讨论中的建议),但面对的是ProgrammingError
,我没有费心去调查。
最后有效的:
import psycopg2
# Fill in the blanks for the conn object
conn = psycopg2.connect(user = 'user',
password = 'password',
host = 'host',
dbname = 'db',
port = 666)
cursor = conn.cursor()
args_str = b','.join(cursor.mogrify("(%s,%s,...)", x) for x in tuple(map(tuple,np_data)))
cursor.execute("insert into table (a,b,...) VALUES "+args_str.decode("utf-8"))
cursor.close()
conn.commit()
conn.close()
是的,平原老psycopg2
。这是一个numpy阵列,但从df
转换为ndarray
不应该太困难。这给了我大约3k行/分钟。
但是,根据其他团队成员的建议,最快的解决方案是在将数据帧作为TSV / CSV转储到S3集群然后复制之后使用COPY命令。如果您要复制非常庞大的数据集,则应该对此进行调查。 (如果我尝试的话,我会在这里更新)
我曾经依靠pandas to_sql()
功能,但它太慢了。我最近改用了以下内容:
import pandas as pd
import s3fs # great module which allows you to read/write to s3 easily
import sqlalchemy
df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}])
s3 = s3fs.S3FileSystem(anon=False)
filename = 'my_s3_bucket_name/file.csv'
with s3.open(filename, 'w') as f:
df.to_csv(f, index=False, header=False)
con = sqlalchemy.create_engine('postgresql://username:[email protected]:5439/yourdatabase')
# make sure the schema for mytable exists
# if you need to delete the table but not the schema leave DELETE mytable
# if you want to only append, I think just removing the DELETE mytable would work
con.execute("""
DELETE mytable;
COPY mytable
from 's3://%s'
iam_role 'arn:aws:iam::xxxx:role/role_name'
csv;""" % filename)
该角色必须允许对S3进行红移访问,请参阅here以获取更多详细信息
我发现对于一个300KB的文件(12000x2数据帧),这需要4秒,相比之下,我获得了pandas to_sql()
函数的8分钟
出于本次对话的目的,Postgres = RedShift您有两种选择:
选项1:
来自熊猫:http://pandas.pydata.org/pandas-docs/stable/io.html#io-sql
pandas.io.sql模块提供了一组查询包装器,以便于数据检索和减少对特定于DB的API的依赖性。 SQLAlchemy提供了数据库抽象(如果已安装)。此外,您还需要一个数据库驱动程序库。这种驱动程序的例子是PostgreSQL的psycopg2或MySQL的pymysql。
编写DataFrames
假设DataFrame数据中包含以下数据,我们可以使用to_sql()将其插入数据库。
id Date Col_1 Col_2 Col_3
26 2012-10-18 X 25.7 True
42 2012-10-19 Y -12.4 False
63 2012-10-20 Z 5.73 True
In [437]: data.to_sql('data', engine)
对于某些数据库,写入大型DataFrame可能会因超出数据包大小限制而导致错误。通过在调用to_sql时设置chunksize参数可以避免这种情况。例如,以下将数据一次批量写入数据库1000行:
In [438]: data.to_sql('data_chunked', engine, chunksize=1000)
选项2
或者你可以简单地做你自己的如果你有一个名为data的数据框,只需使用iterrows循环它:
for row in data.iterrows():
然后将每行添加到数据库中。我会为每一行使用copy而不是insert,因为它会更快。
http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from