当使用超过 63 个核心的 python 的 multiprocessing.pool.Pool 时,我得到一个
ValueError
:
from multiprocessing.pool import Pool
def f(x):
return x
if __name__ == '__main__':
with Pool(70) as pool:
arr = list(range(70))
a = pool.map(f, arr)
print(a)
输出:
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Users\fischsam\Anaconda3\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Users\fischsam\Anaconda3\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\fischsam\Anaconda3\lib\multiprocessing\pool.py", line 519, in _handle_workers
cls._wait_for_updates(current_sentinels, change_notifier)
File "C:\Users\fischsam\Anaconda3\lib\multiprocessing\pool.py", line 499, in _wait_for_updates
wait(sentinels, timeout=timeout)
File "C:\Users\fischsam\Anaconda3\lib\multiprocessing\connection.py", line 879, in wait
ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
File "C:\Users\fischsam\Anaconda3\lib\multiprocessing\connection.py", line 811, in _exhaustive_wait
res = _winapi.WaitForMultipleObjects(L, False, timeout)
ValueError: need at most 63 handles, got a sequence of length 72
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
程序看起来运行良好;结果正是我所期望的。我可以忽略
ValueError
吗?
背景:我用谷歌搜索了这个问题,它似乎与 Windows 上 python 的限制有关(
_winapi.WaitForMultipleObjects
);参见例如这里。建议的修复是将使用的核心数量限制为 63 个。这并不令人满意,因为我希望我的服务器上有 100 个以上的核心。我真的需要限制核心吗?为什么?有解决办法吗?
有趣的是,当我使用
PrcocessPoolExecutor(max_workers=70)
模块中的 concurrent.futures
查看问题是否仍然存在时,我得到 ValueError: max_workers must be <= 61 并且程序在提交任何作业之前立即终止。 这强烈表明 Windows 存在无法规避的限制。但是,您的程序从未真正终止,只是在收到异常后挂起。在我的 8 核计算机上,如果我指定任何大于 60(不是 61 也不是 63)的工作线程,无论我使用 multiprocessing.Pool
还是 concurrent.futures.ProcessPoolExecutor
,它都会挂起。找出您的机器上的最大工作线程数允许其正常终止而不产生异常并坚持下去。
from concurrent.futures import ProcessPoolExecutor
def f(x):
return x
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=70) as executor:
a = list(executor.map(f, range(70)))
print(a)
我编写了一个工具,可以对任意数量的并行进程进行基于对象的并行处理。
(基于对象的并行性:每个并行进程中都有一个对象的副本,并且可以调用其方法,而无需重新初始化该对象。)
该软件包可在 PyPi 上使用。您可以使用该工具执行类似于经典多处理的操作。
import os
from objectproxypool import ProxyPool
import numpy as np
# Define a dummy class containing only the function
# you are interested in
class MyClass:
# Implement the function you are interested in.
# Static methods are not supported.
# You may just ignore the self argument.
def f(self, x):
return os.getpid()
if __name__ == '__main__':
# Create a proxy pool with 70 workers in independent processes.
# If `separateProcesses = False`, the workers will be threads.
with ProxyPool(MyClass, numWorkers=70, separateProcesses=True) as myObject:
# myObject is a proxy to the object of type MyClass.
# `map_args = True` assures that each worker gets one argument at a time.
# Otherwise, each worker will receive the argument `range(100)`
a = myObject.f(range(100), map_args=True)
print("We used {} different processes.".format(np.unique(a).size))
print(a)