我正在从 SQL 数据库读取和处理面向行的数据,然后将其作为列式数据写入 Parquet 文件。
在 Python 中转换这些数据很简单。问题是数据集非常大,Python 代码的原始速度是一个实际瓶颈。我的代码花费了大量时间将 Python 字典列表转换为列表字典,以将其提供给 PyArrow 的
ParquetWriter.write_table()
。
读取到的数据是SQLAlchemy和psycopg2。
简化的循环如下所示:
# Note: I am using a trick of preallocated lists here already
columnar_data = {"a": [], "b": []}
for row in get_rows_from_sql():
columnar_data["a"].append(process(row["a"])
columnar_data["b"].append(process(row["b"])
我想做的事:
input_data = get_rows_from_sql()
columnar_input = convert_from_list_of_dicts_to_dict_of_lists_very_fast(input_data)
columnar_output["a"] = map(process, columnar_input["a"])
columnar_output["b"] = map(process, columnar_input["b"])
我希望尽可能多地移动将数据从Python本机转换为CPython内部的循环,以便代码运行得更快。
SQLAlchemy 或 psycopg2 似乎本身并不支持列式数据输出,因为 SQL 是面向行的,但我在这里可能是错的。
我的问题是这里可以应用什么样的Python优化?我认为这是一个非常常见的问题,因为 Pandas 和 Polars 操作的是面向列的数据,而数据输入通常是面向行的,如 SQL。
据我所知,CPython 中没有办法做到这一点。当我还没有真的必须关心性能时,那么我只会使用引擎和
pandas
原始的SQLAlchemy:
with engine.connect() as conn:
q = "SELECT a, b FROM x"
data = conn.execute(q)
df = pd.DataFrame(data, columns=data.keys())
首先要注意的是,我在这里实际上转换的是 tuples 列表,而不是字典列表。如果你得到一个字典列表,那么这就是你在某处配置的东西,我想它会增加开销。您不想在这里做的一件事是使用 ORM 来创建 Python 对象,但我仍然会考虑摆脱 dict 转换。
也就是说,
polars
有pl.read_database()
和pl.read_database_uri()
这可能是最快的方法。这里需要注意的SQLAlchemy
是:
read_database_uri 函数可能明显快于 read_database 如果您使用的是 SQLAlchemy 或 DBAPI2 连接,如下所示 Connectorx 将优化结果集到 Arrow 的转换 Rust 中的格式,而这些库将按行返回数据 在我们可以加载到 Arrow 之前使用 Python。请注意,您可以轻松地 通过以下方式从 SQLAlchemy 引擎对象确定连接的 URI 调用 str(conn.engine.url)。
在这两种方法之间,您应该能够跳过构建任何临时 Python 对象,而只需将数据以表格格式放入 df 中。它不是 CPython,但它保留在 Rust 生态系统中,具有可比性(甚至比第 3 方库更快)。