Python 中的多处理持久队列顺序

问题描述 投票:0回答:1

我正在尝试跨多个处理器运行黄土回归,但是当我的代码将每个并发数据帧添加到队列中时,它们全部乱序,并且生成的图表看起来很糟糕。

def smooth_data_mp(data_frame):
    num_processes = 8
    chunk_size = 125000
    fraction = 125 / chunk_size
    print(data_frame.head())
    result_queue = Manager().Queue()
    with Pool(processes=num_processes) as pool:
        pool.starmap(process_data, [(data_frame, i, chunk_size, fraction, result_queue) for i in
                                    range(len(data_frame) // chunk_size)])

    # Collect results from the queue in order
    result_list = pd.DataFrame(result_queue.get())
    while not result_queue.empty():
        result_list = pd.concat([result_list, result_queue.get()])
    return result_list

def process_data(dataframe, i, chunk_size, fraction, result_queue):
    start_frame = chunk_size * i
    end_frame = min(chunk_size * (i + 1), len(dataframe))  # Ensure end_frame doesn't exceed length of sampleData
    print(f'{start_frame}, {end_frame}') # just for debugging
    new_data_frame = calculate_loess_on_subset(dataframe[start_frame:end_frame], chunk_size, fraction, i)
    result_queue.put(new_data_frame)

如何确保在 process_data 函数中添加到队列中的每个数据帧都是按照原始数据集中出现的顺序添加的,而不是仅在进程完成时添加?

我尝试过使用不同的队列类型,例如常规队列和管理器队列,但只有管理器有效......但我不确定如何解决问题。

python multiprocessing
1个回答
0
投票

问题是,一旦新的数据帧可用,您就会将这些数据帧追加到队列中。但你不需要队列,你可以只使用

starmap
:

的返回值
with Pool(processes=num_processes) as pool:
    results = pool.starmap(process_data, 
         [(data_frame, i, chunk_size, fraction) 
          for i in range(len(data_frame) // chunk_size)])
return pd.concat(results)

这将保留输入数据帧的原始顺序。 然后

process_data
函数应修改为:

def process_data(dataframe, i, chunk_size, fraction):
    ...similar code as original...
    return new_data_frame

(如果由于某种原因您确实也想保留输出结果队列,那么确保排序的最简单方法可能就是确保向处理后的帧添加显式行索引,最后您需要对该索引上的合并帧进行排序。) (另请参阅 multiprocessing.managers.SyncManager 中的示例“测试代码”)

© www.soinside.com 2019 - 2024. All rights reserved.