我正在处理一个大型数据集(大约 100 万条记录),用 Python 表示为字典列表。每个字典都有多个字段,我需要根据几个条件过滤数据,然后处理过滤后的结果。主要挑战是数据集太大,无法一次全部装入内存,我需要一种有效的解决方案,以内存意识的方式过滤和处理数据。
这是我想要实现的目标的简化版本:
过滤年龄 > 25 且状态 == '活跃' 的记录。 对于过滤后的记录,提取某些字段,例如姓名和电子邮件,并对其进行处理(例如,将姓名转换为小写,从电子邮件中提取域名)。
# Sample dataset
data = [
{'name': 'Alice', 'age': 30, 'status': 'active', 'email': '[email protected]'},
{'name': 'Bob', 'age': 22, 'status': 'inactive', 'email': '[email protected]'},
{'name': 'Charlie', 'age': 35, 'status': 'active', 'email': '[email protected]'},
# More records...
]
# Attempted approach
def process_record(record):
# Process the record, e.g., lowercase name, extract email domain
record['name'] = record['name'].lower()
record['email_domain'] = record['email'].split('@')[1]
return record
filtered_and_processed = []
for record in data:
if record['age'] > 25 and record['status'] == 'active':
processed_record = process_record(record)
filtered_and_processed.append(processed_record)
# Output the results
print(filtered_and_processed)
处理这个问题的最佳方法是对记录流使用迭代,而不是将它们全部聚合到内存中。有几种方法可以做到这一点。
您可以通过使用
csv
模块并直接迭代文件句柄来完成此操作:
import csv
with open('yourfile.csv', newline='') as infile, open('outfile.csv', 'w', newline='') as outfile:
reader = csv.DictReader(infile)
writer = csv.DictWriter(outfile)
# Iterate directly over the reader
for row in reader:
if row['age'] < 25 and not row['status'] == 'active':
continue
# Write each row, don't aggregate
writer.writerow(process_record(row))
这将处理流中的所有记录,并且内存效率很高。
pandas
批处理:import pandas as pd
with pd.read_csv('yourfile.csv', chunksize=10**6) as reader:
for chunk in reader:
df = chunk[(chunk['age'] > 25) & (chunk['status'] == 'active')].apply(process_record)
df.to_csv('outfile.csv', mode='a', index=False, header=True)