我正在使用 multiprocessing.pool
apply_async
函数通过计算机视觉算法处理一些视频数据,由于算法没有内存,当前加速处理的策略是将视频分割成更小的部分,并且并行处理它们。
为了并行化该过程,相关函数被打包成“worker”函数,然后在 multiprocessing.pool.Pool 类的 apply_async 方法中使用。由于 apply_async 大致按完成顺序返回结果,并且视频序列必须保持原始顺序,因此在池关闭并加入后,各种输出将重新排序。这是通过为每个工作函数分配一个连续的标识号来完成的,该标识号与结果数据数组一起打包成一个元组作为工作函数的输出。
使用此标识号,回调函数将输出数据数组分配给先前初始化列表中具有相应索引的空项。
代码如下:
import pandas
import multiprocessing.pool
import traceback
def myFunction(video_file_path, frame_range) -> pandas.DataFrame:
...
return data_frame
def worker(video_file_path, frame_range, process_nr):
try:
result = myFunction(video_file_path, frame_range)
return process_nr, result
except Exception as ex: # the following error handling should not matter, but...
tb = sys.exception().__traceback__
print(f"error @ worker {process_nr} on {video_file_path}")
print(f"Exception {type(ex)}: {ex}\nTraceback"
f"\n{''.join(traceback.format_tb(tb))}")
return process_nr, None
def log_result(result):
result_list[result[0]] = result[1]
if __name__ == '__main__':
pool_size = 8
frame_ranges = [...] # handled by some slicing function
result_list = [None] * len(frame_ranges)
with multiprocessing.pool.Pool(pool_size) as pool:
for n, fcnk in enumerate(frame_ranges):
pool.apply_async(worker, args=("C:\\some\\video\\path", fcnk, n),
callback=log_result)
pool.close()
pool.join()
results_dataframe = result_list[0]
for n in range(1, len(result_list)):
results_dataframe = pandas.merge(results_dataframe, result_list[n], how='outer')
results_dataframe.to_csv("C:\\some\\save\\path","some_save_name.csv")
...并且它在我的 Windows 10 台式电脑上的 PyCharm IDE 中运行得非常好,具有 Python 3.11.6 conda 环境。它的行为完全符合预期: 结果保存在 .csv 文件中,每一行对应一帧的数据,并且所有行/帧在最终 csv 文件中都按正确的顺序排列。
但是,当我在运行 Ubuntu 的远程集群上运行它时,结果完全混乱,一个帧切片中的行/帧与最终 csv 文件中另一个帧切片的行/帧混合在一起。
这是由于在未分配列表上同时写入而导致的一些荒谬的竞争条件,还是我的代码中的其他错误/疏忽?我目前正在远程服务器上运行一些测试,但我只有一个文本终端,这需要一些时间。
不会立即发现代码有任何问题,但有一个更简单、更安全的解决方案。
您想要在一组有序输入
myFunction
上同时执行一个frame_ranges
函数(我们假设它是线程安全的,您应该检查这一点)。此外,您希望结果与 frame_ranges
的输入列表保持相同的顺序。
为此,您可以使用
pool.map()
方法。这看起来像这样:
import multiprocessing.pool
import sys
import traceback
from functools import partial
import pandas
def myFunction(frame_range, video_file_path) -> pandas.DataFrame:
...
return data_frame
def worker(frame_range, video_file_path):
try:
result = myFunction(frame_range, video_file_path)
return result
except Exception as ex: # the following error handling should not matter, but...
tb = sys.exception().__traceback__
print(f"error @ worker on {video_file_path}")
print(f"Exception {type(ex)}: {ex}\nTraceback"
f"\n{''.join(traceback.format_tb(tb))}")
return None
if __name__ == '__main__':
pool_size = 8
frame_ranges = [...] # handled by some slicing function
_worker = partial(worker, video_file_path="C:\\some\\video\\path")
with multiprocessing.pool.Pool(pool_size) as pool:
result_list = pool.map(_worker, frame_ranges)
results_dataframe = result_list[0]
for n in range(1, len(result_list)):
results_dataframe = pandas.merge(results_dataframe, result_list[n], how='outer')
results_dataframe.to_csv("C:\\some\\save\\path","some_save_name.csv")
pool.map()
方法会自动处理有序列表中的聚集或结果,您不必再自己做。另外,您不必 close
或 join
池,因为这是由 with
块自动处理的。我重新排序了函数的参数,以便能够使用 partial
来冻结一些常量输入参数(视频文件路径)。
如果
myFunction
函数是线程安全的,这应该会给你一个正确的结果。