[![平滑和非平滑数据示例图][1]][1]``` def smooth_data_mp(data_frame, n): # dataframe 是数据,n 是要查看的数据点的数量 num_processes = os.cpu_count() + 2 chunk_size = len(data_frame.index) // 进程数 打印(块大小) end_chunk = len(data_frame) // chunk_size - 1 # 必须减去 1,因为索引为 0 这行花了一个小时来调试天哪 以 Pool(processes=num_processes) 作为池: 结果 = pool.starmap(process_data, [(data_frame, i, chunk_size, n, end_chunk) for i in 范围(len(data_frame) // chunk_size)]) 返回 pd.concat(结果)
def process_data(dataframe, i, chunk_size, n, end_chunk): fraction = n / chunk_size # n 是平滑函数中要查看的数据点的数量 如果我==0: 起始帧 = 0 别的: 起始帧 = 块大小 * i - n 如果 i == end_chunk: end_frame = len(dataframe) # 确保end_frame不超过sampleData的长度 别的: 结束帧 = 块大小 * (i + 1) + n new_data_frame =calculate_loess_on_subset(dataframe[start_frame:end_frame],fraction,i,n,end_chunk)
start_index = chunk_size * i
if i == end_chunk:
end_index = len(dataframe)
else:
end_index = chunk_size * (i + 1)
new_data_frame.index = pd.RangeIndex(start_index, end_index)
return new_data_frame
How can I chop up the data as I want to inside the process_data function prior to sending it to each process? I'm running into scaling issues with more processes and larger dataframes due to the memory overhead.
The reason I'm padding the data is because I'm running a smoothing function and if I dont add the extra 'n' datapoints to each side of the chunk (can't do that for start and end chunks but that's ok) I end up with gaps where the smoothing doesn't match up because the loess doesn't take into account the points prior to the ones in its own chunk.
[1]: https://i.stack.imgur.com/0znn1.png
我还没有真正实现这个。但是,将 if elif 语句移到函数 process_data 之外应该允许我将其拆分为列表列表,其中子列表为块,并将多处理池定义中的 for 循环更改为 for chunk in chunks。然后我需要做的就是传入每个块的索引并在之后重建它们。
如果它能工作一点,我会回来用代码更新这个线程。