我在从 MS SQL Server 数据库查询包含超过 500 万条记录的表时遇到问题。我想选择所有记录,但在将大量数据选择到内存中时,我的代码似乎失败了。
这有效:
import pandas.io.sql as psql
sql = "SELECT TOP 1000000 * FROM MyTable"
data = psql.read_frame(sql, cnxn)
...但这不起作用:
sql = "SELECT TOP 2000000 * FROM MyTable"
data = psql.read_frame(sql, cnxn)
它返回此错误:
File "inference.pyx", line 931, in pandas.lib.to_object_array_tuples
(pandas\lib.c:42733) Memory Error
我已阅读here,从 csv 文件创建
dataframe
时存在类似的问题,解决方法是使用“iterator”和“chunksize”参数,如下所示:
read_csv('exp4326.csv', iterator=True, chunksize=1000)
是否有类似的从 SQL 数据库查询的解决方案?如果不是,首选的解决方法是什么?我应该使用其他方法来读取块记录吗?我在here读到了一些关于在 pandas 中处理大型数据集的讨论,但执行 SELECT * 查询似乎需要做很多工作。当然有更简单的方法。
正如评论中提到的,从 pandas 0.15 开始,
read_sql
中有一个 chunksize 选项来逐块读取和处理查询块:
sql = "SELECT * FROM My_Table"
for chunk in pd.read_sql_query(sql , engine, chunksize=5):
print(chunk)
参考:http://pandas.pydata.org/pandas-docs/version/0.15.2/io.html#querying
更新:请务必查看下面的答案,因为 Pandas 现在内置了对分块加载的支持。
您可以简单地尝试按块读取输入表,然后从各个部分组装完整的数据帧,如下所示:
import pandas as pd
import pandas.io.sql as psql
chunk_size = 10000
offset = 0
dfs = []
while True:
sql = "SELECT * FROM MyTable limit %d offset %d order by ID" % (chunk_size,offset)
dfs.append(psql.read_frame(sql, cnxn))
offset += chunk_size
if len(dfs[-1]) < chunk_size:
break
full_df = pd.concat(dfs)
整个数据框也可能太大而无法放入内存,在这种情况下,除了限制您选择的行数或列数之外,您别无选择。
代码解决方案及备注。
# Create empty list
dfl = []
# Create empty dataframe
dfs = pd.DataFrame()
# Start Chunking
for chunk in pd.read_sql(query, con=conct, ,chunksize=10000000):
# Start Appending Data Chunks from SQL Result set into List
dfl.append(chunk)
# Start appending data from list to dataframe
dfs = pd.concat(dfl, ignore_index=True)
但是,我的内存分析告诉我,即使在提取每个块后释放内存,列表也会变得越来越大并占用该内存,导致净净没有获得可用 RAM。
很想听听作者/其他人怎么说。
我发现处理此问题的最佳方法是利用 SQLAlchemy steam_results 连接选项
conn = engine.connect().execution_options(stream_results=True)
并将 conn 对象传递给 pandas
pd.read_sql("SELECT *...", conn, chunksize=10000)
这将确保光标在服务器端而不是客户端进行处理
您可以使用 服务器端游标(又名流结果)
import pandas as pd
from sqlalchemy import create_engine
def process_sql_using_pandas():
engine = create_engine(
"postgresql://postgres:pass@localhost/example"
)
conn = engine.connect().execution_options(
stream_results=True)
for chunk_dataframe in pd.read_sql(
"SELECT * FROM users", conn, chunksize=1000):
print(f"Got dataframe w/{len(chunk_dataframe)} rows")
# ... do something with dataframe ...
if __name__ == '__main__':
process_sql_using_pandas()
正如其他人的评论中提到的,在
chunksize
中使用 pd.read_sql("SELECT * FROM users", engine, chunksize=1000)
参数并不能解决问题,因为它仍然将整个数据加载到内存中,然后将其逐块提供给您。
更多解释这里
chunksize 仍然加载内存中的所有数据,stream_results=True 就是答案。它是服务器端游标,用于加载给定块中的行并节省内存。在许多管道中有效使用,它在加载历史数据时也可能会有所帮助
stream_conn = engine.connect().execution_options(stream_results=True)
将 pd.read_sql 与 thechunksize 一起使用
pd.read_sql("SELECT * FROM SOURCE", stream_conn , chunksize=5000)
这是一句单行话。我能够将 49m 记录加载到数据帧中,而不会耗尽内存。
dfs = pd.concat(pd.read_sql(sql, engine, chunksize=500000), ignore_index=True)
您可以更新版本airflow。 例如,我在使用 docker-compose 的 2.2.3 版本中遇到了这个错误。
mysq 6.7
cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"
redis:
cpus: "0.5"
mem_reservation: "10M"
mem_limit: "250M"
气流网络服务器:
cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"
气流调度器:
cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"
气流工作者:
#cpus: "0.5"
#mem_reservation: "10M"
#mem_limit: "750M"
错误:任务已退出,返回代码为 Negsignal.SIGKILL
但是更新版本 来自阿帕奇/气流:2.3.4.
并使用 docker-compose 中配置的相同资源毫无问题地执行拉取
我的DAG提取器:
def getDataForSchema(表,连接,tmp_path,**kwargs):
conn=connect_sql_server(conecction)
query_count= f"select count(1) from {table['schema']}.{table['table_name']}"
logging.info(f"query: {query_count}")
real_count_rows = pd.read_sql_query(query_count, conn)
##sacar esquema de la tabla
metadataquery=f"SELECT COLUMN_NAME ,DATA_TYPE FROM information_schema.columns \
where table_name = '{table['table_name']}' and table_schema= '{table['schema']}'"
#logging.info(f"query metadata: {metadataquery}")
metadata = pd.read_sql_query(metadataquery, conn)
schema=generate_schema(metadata)
#logging.info(f"schema : {schema}")
#logging.info(f"schema: {schema}")
#consulta la tabla a extraer
query=f" SELECT {table['custom_column_names']} FROM {table['schema']}.{table['table_name']} "
logging.info(f"quere data :{query}")
chunksize=table["partition_field"]
data = pd.read_sql_query(query, conn, chunksize=chunksize)
count_rows=0
pqwriter=None
iteraccion=0
for df_row in data:
print(f"bloque {iteraccion} de total {count_rows} de un total {real_count_rows.iat[0, 0]}")
#logging.info(df_row.to_markdown())
if iteraccion == 0:
parquetName=f"{tmp_path}/{table['table_name']}_{iteraccion}.parquet"
pqwriter = pq.ParquetWriter(parquetName,schema)
tableData = pa.Table.from_pandas(df_row, schema=schema,safe=False, preserve_index=True)
#logging.info(f" tabledata {tableData.column(17)}")
pqwriter.write_table(tableData)
#logging.info(f"parquet name:::{parquetName}")
##pasar a parquet df directo
#df_row.to_parquet(parquetName)
iteraccion=iteraccion+1
count_rows += len(df_row)
del df_row
del tableData
if pqwriter:
print("Cerrando archivo parquet")
pqwriter.close()
del data
del chunksize
del iteraccion
使用 sqlalchemy 和 with 运算符的完整一行代码:
db_engine = sqlalchemy.create_engine(db_url, pool_size=10, max_overflow=20)
with Session(db_engine) as session:
sql_qry = text("Your query")
data = pd.concat(pd.read_sql(sql_qry,session.connection().execution_options(stream_results=True), chunksize=500000), ignore_index=True)
您可以尝试更改 chunksize 来找到适合您情况的最佳尺寸。
您可以使用 chunksize 选项,但如果您有 RAM 问题,需要将其设置为 6-7 位。
对于 pd.read_sql(sql, engine, params = (fromdt, todt,filecode), chunksize=100000) 中的块: df1.append(块) dfs = pd.concat(df1,ignore_index=True)
这样做
如果你想限制输出的行数,只需使用:
data = psql.read_frame(sql, cnxn,chunksize=1000000).__next__()