Python multiprocessing.pool apply_async 回调函数无法重新排序输出数据

问题描述 投票:0回答:1

我正在使用 multiprocessing.pool

apply_async
函数通过计算机视觉算法处理一些视频数据,由于算法没有内存,当前加速处理的策略是将视频分割成更小的部分,并且并行处理它们。

为了并行化该过程,相关函数被打包成“worker”函数,然后在 multiprocessing.pool.Pool 类的 apply_async 方法中使用。由于 apply_async 大致按完成顺序返回结果,并且视频序列必须保持原始顺序,因此在池关闭并加入后,各种输出将重新排序。这是通过为每个工作函数分配一个连续的标识号来完成的,该标识号与结果数据数组一起打包成一个元组作为工作函数的输出。

使用此标识号,回调函数将输出数据数组分配给先前初始化列表中具有相应索引的空项。

代码如下:

import pandas
import multiprocessing.pool
import traceback


def myFunction(video_file_path, frame_range) -> pandas.DataFrame:
    ...
    return data_frame


def worker(video_file_path, frame_range, process_nr):
    try:
        result = myFunction(video_file_path, frame_range)
        return process_nr, result
    except Exception as ex:  # the following error handling should not matter, but...
        tb = sys.exception().__traceback__
        print(f"error @ worker {process_nr} on {video_file_path}")
        print(f"Exception {type(ex)}: {ex}\nTraceback"
              f"\n{''.join(traceback.format_tb(tb))}")
        return process_nr, None


def log_result(result):
    result_list[result[0]] = result[1]


if __name__ == '__main__':
    pool_size = 8
    frame_ranges = [...]  # handled by some slicing function
    result_list = [None] * len(frame_ranges)

    with multiprocessing.pool.Pool(pool_size) as pool:
        for n, fcnk in enumerate(frame_ranges):
            pool.apply_async(worker, args=("C:\\some\\video\\path", fcnk, n),
                             callback=log_result)
        pool.close()
        pool.join()

    results_dataframe = result_list[0]
    for n in range(1, len(result_list)):
        results_dataframe = pandas.merge(results_dataframe, result_list[n], how='outer')
    results_dataframe.to_csv("C:\\some\\save\\path","some_save_name.csv")
    

...并且它在我的 Windows 10 台式电脑上的 PyCharm IDE 中运行得非常好,具有 Python 3.11.6 conda 环境。它的行为完全符合预期: 结果保存在 .csv 文件中,每一行对应一帧的数据,并且所有行/帧在最终 csv 文件中都按正确的顺序排列。

但是,当我在运行 Ubuntu 的远程集群上运行它时,结果完全混乱,一个帧切片中的行/帧与最终 csv 文件中另一个帧切片的行/帧混合在一起。

这是由于在未分配列表上同时写入而导致的一些荒谬的竞争条件,还是我的代码中的其他错误/疏忽?我目前正在远程服务器上运行一些测试,但我只有一个文本终端,这需要一些时间。

python python-3.x multiprocessing python-multiprocessing threadpool
1个回答
0
投票

不会立即发现代码有任何问题,但有一个更简单、更安全的解决方案。

您想要在一组有序输入

myFunction
上同时执行一个
frame_ranges
函数(我们假设它是线程安全的,您应该检查这一点)。此外,您希望结果与 frame_ranges 的输入列表保持
相同的顺序

为此,您可以使用

pool.map()
方法。这看起来像这样:

import multiprocessing.pool
import sys
import traceback
from functools import partial

import pandas


def myFunction(frame_range, video_file_path) -> pandas.DataFrame:
    ...
    return data_frame


def worker(frame_range, video_file_path):
    try:
        result = myFunction(frame_range, video_file_path)
        return result
    except Exception as ex:  # the following error handling should not matter, but...
        tb = sys.exception().__traceback__
        print(f"error @ worker on {video_file_path}")
        print(f"Exception {type(ex)}: {ex}\nTraceback"
              f"\n{''.join(traceback.format_tb(tb))}")
        return None


if __name__ == '__main__':
    pool_size = 8
    frame_ranges = [...]  # handled by some slicing function

    _worker = partial(worker, video_file_path="C:\\some\\video\\path")

    with multiprocessing.pool.Pool(pool_size) as pool:
        result_list = pool.map(_worker, frame_ranges)

    results_dataframe = result_list[0]
    for n in range(1, len(result_list)):
        results_dataframe = pandas.merge(results_dataframe, result_list[n], how='outer')
    results_dataframe.to_csv("C:\\some\\save\\path","some_save_name.csv")

pool.map()
方法会自动处理有序列表中的聚集或结果,您不必再自己做。另外,您不必
close
join
池,因为这是由
with
块自动处理的。我重新排序了函数的参数,以便能够使用
partial
来冻结一些常量输入参数(视频文件路径)。

如果

myFunction
函数是线程安全的,这应该会给你一个正确的结果。

© www.soinside.com 2019 - 2024. All rights reserved.