我正在处理一个 Python 批处理任务,该任务涉及读取大型 CSV 文件(最多 500 万行)并对每一行执行各种操作。尽管优化了数据库查询并利用了多重处理,但对于 500 万行文件,总处理时间仍然约为 3 小时。
使用 pandas 读取和预处理 CSV 文件。使用 Python 的多处理模块将文件分为多个块以进行并行处理。
数据库有超过 100 万行的表。查询经过优化并总共在约 20 分钟内执行。使用索引表和高效联接。
CSV 文件中的每一行都会经过验证和转换。有些行涉及数据库中的附加逻辑或查找。由于行级处理逻辑,受 CPU 限制的操作占主导地位。
剩余的约 2.5 小时用于 Python 中的行级处理。 对于较小的文件(例如 100k 行),大约需要 30 分钟,这仍然太慢了。
由于我不知道您的内存容量和您分配的批处理大小,因此我们定义两个变量以提高灵活性并平衡内存使用:
创建一个线程池来处理 CSV 文件读取:用 4 个线程初始化它,并允许其扩展到最多 8 个线程。这将有效管理 IO 操作,而不会占用系统资源。 设置每个 CSV 块的大小:鉴于本地 IO 速度通常很快,您可以将块大小设置为 50,000 行。即使对于多达 100 万行的数据集,这也应确保最小的 IO 开销。 即使对于 100 万行这样的大型数据集,本地 IO 操作也应该花费很少的时间。核心瓶颈似乎在于数据库处理和您已实现的一些自定义逻辑。在不了解逻辑复杂性的情况下,这里有一些专门用于优化数据库处理的建议:
建立数据库连接池:这允许您重用连接,而不是重复建立和拆除连接,这可以节省大量时间。 基于公共字段的批量查询:如果您的操作主要涉及查询,请寻找您正在查询的字段之间的共性。如果索引设置得当,MySQL 可以非常快速地读取数千甚至数万条记录。将类似的查询批量处理在一起,以减少单个数据库调用的数量。 您可以考虑这些建议,尽管它们非常笼统,因为我没有有关您的设置的具体细节。基于这些点进行优化应该有助于提高性能。