给定的问题:我有从folder1到folder999命名的文件夹。在每个文件夹中都有实木复合地板文件-从1.parquet到999.parquet命名。每个实木复合地板由给定结构的熊猫数据框组成:
id |title |a
1 |abc |1
1 |abc |3
1 |abc |2
2 |abc |1
... |def | ...
其中列a可以是范围a1到a3的值。
partial步骤是获取结构:
id | title | a1 | a2 | a3
1 | abc | 1 | 1 | 1
2 | abc | 1 | 0 | 0
...
为了获得最终表格,:
title
id | abc | def | ...
1 | 3 | ... |
2 | 1 | ... |
其中列abc的值是列a1,a2和a3的总和。
目标是获得对所有文件夹中的所有镶木地板文件计算出的最终形式。
现在,我现在的处境是这样的:我确实知道如何分步接收最终表格,例如通过使用sparse.coo_matrix(),如How to make full matrix from dense pandas dataframe中所述。
问题是:由于内存限制,我不能简单地一次读取所有实木复合地板。
我有三个问题:
如果我有大量数据(假设每个实木复合地板文件包含500MB),如何高效到达那里?
我可以将每个实木复合地板分别转换成最终形式,然后以某种方式合并它们吗?如果可以,我该怎么办?
是否有任何方法可以跳过partial步骤?
对于文件中的每个数据框,您似乎都
id
,title
列分组数据>a
列中的数据求和没有必要为任务创建完整的矩阵,因此partial
步骤也是如此。
[我不确定文件中存在id
,title
的唯一组合和/或全部。安全的步骤是分批处理文件,保存文件结果,然后合并所有结果
看起来像,
import pandas as pd
import numpy as np
import string
def gen_random_data(N, M):
# N = 100
# M = 10
titles = np.apply_along_axis(lambda x: ''.join(x), 1, np.random.choice(list(string.ascii_lowercase), 3*M).reshape(-1, 3))
titles = np.random.choice(titles, N)
_id = np.random.choice(np.arange(M) + 1, N)
val = np.random.randint(M, size=(N,))
df = pd.DataFrame(np.vstack((_id, titles, val)).T, columns=['id', 'title', 'a'])
df = df.astype({'id': np.int64, 'title': str, 'a': np.int64})
return df
def combine_results(grplist):
# stitch into one dataframe
comb_df = pd.concat(dflist, axis=1)
# Sum over common axes i.e. id, titles
comb_df = comb_df.apply(lambda row: np.nansum(row), axis=1)
# Return a data frame with sum of a's
return comb_df.to_frame('sum_of_a')
totalfiles = 10
batch = 2
filelist = []
for counter,nfiles in enumerate(range(0, totalfiles, batch)):
# Read data from files. generate random data
dflist = [gen_random_data(100, 2) for _ in range(nfiles)]
# Process the data in memory
dflist = [_.groupby(['id', 'title']).agg(['sum']) for _ in dflist]
collection = combine_results(dflist)
# write intermediate results to file and repeat the process for the rest of the files
intermediate_result_file_name = f'resfile_{counter}'
collection.to_parquet(intermediate_result_file_name, index=True)
filelist.append(intermediate_result_file_name)
# Combining result files.
collection = [pd.read_parquet(file) for file in filelist]
totalresult = combine_results(collection)