下面提供了Python代码文件。我在 Linux mint 21.3 上使用 Python 3.10.12(以防需要这些信息)。拥有 2 名工作人员的池比没有任何多重处理的池需要更多时间。我在这里做错了什么?
import multiprocessing
import time
import random
def fun1( x ):
y = 0
for i in range( len( x ) ):
y = y + x[i]
return( y )
def fun2( x ):
p = multiprocessing.Pool( 2 )
y1, y2 = p.map( fun1, [ x[ : len( x ) // 2 ], x[ len( x ) // 2 : ] ] )
y = y1 + y2
return( y )
x = [ random.random() for i in range( 10 ** 6 ) ]
st = time.time()
ans = fun1( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )
st = time.time()
ans = fun2( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )
x = [ random.random() for i in range( 10 ** 7 ) ]
st = time.time()
ans = fun1( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )
st = time.time()
ans = fun2( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )
这是我在终端中得到的内容。
time = 0.043381452560424805, answer = 499936.40420325665.
time = 0.1324300765991211, answer = 499936.40420325927.
time = 0.4444568157196045, answer = 5000677.883536603.
time = 0.8388040065765381, answer = 5000677.883536343.
我还在
if __name__ == '__main__':
之后和其余部分之前使用了 fun2
行,我在终端上得到了相同的结果。我还在 Codio 服务器上尝试了 Python 3.6.2。我也有类似的时间。
time = 0.048882484436035156, answer = 499937.07655266096.
time = 0.15220355987548828, answer = 499937.0765526707.
time = 0.4848289489746094, answer = 4999759.127770024.
time = 1.4035391807556152, answer = 4999759.127769606.
我猜我在代码中所做的事情有问题,是对如何使用
multiprocessing.Pool
而不是Python的误解,但想不出是什么。任何帮助将不胜感激。我预计使用两名工作人员可以将速度提高两倍,而不是降低速度。另外,如果需要的话,我检查了multiprocessing.cpu_count()
,Codio服务器有4个,我的电脑有12个cpu。
当进行多重处理时,您的列表必须被序列化。这是使用 pickle 实现的。当列表的长度为 10**7 时,列表的内存大小为 89,095,160。序列化对象稍大一些,为 90,032,371。该 90MB 对象被写入磁盘(在某个临时位置),然后在执行函数之前被消耗。这是一个相对缓慢的过程。
您可以通过不序列化到磁盘来稍微改善问题。相反,您可以使用共享内存,尽管这仍然不如线性执行那么快 - 即,当直接调用 fun1() 时
这是代码的变体,演示了 3 种技术、打印对象大小并输出各种计时:
import multiprocessing
import time
import random
from multiprocessing.shared_memory import SharedMemory
import pickle
import sys
def fun1(x):
return sum(x)
def fun2(x):
with multiprocessing.Pool() as p:
data = [x[: len(x) // 2], x[len(x) // 2 :]]
return sum(p.map(fun1, data))
def fun3(name):
shm = SharedMemory(name)
try:
x = pickle.loads(shm.buf)
return fun1(x)
finally:
shm.close()
if __name__ == "__main__":
x = [random.random() for i in range(10**7)]
size = sys.getsizeof(x)
print(f"In-memory {size=:,}")
try:
smb = pickle.dumps(x)
size = len(smb)
print(f"Pickled object {size=:,}")
shm = SharedMemory(create=True, size=size)
shm.buf[:size] = smb
for func in fun1, fun2:
st = time.perf_counter()
ans = func(x)
et = time.perf_counter()
print(func.__name__, f"time = {et - st}, answer = {ans}.")
st = time.perf_counter()
ans = fun3(shm.name)
et = time.perf_counter()
print(fun3.__name__, f"time = {et - st}, answer = {ans}.")
finally:
shm.close()
shm.unlink()
输出:
In-memory size=89,095,160
Pickled object size=90,032,371
fun1 time = 0.21793299994897097, answer = 4999444.726535229.
fun2 time = 0.6652624170528725, answer = 4999444.726535082.
fun3 time = 0.4321752089308575, answer = 4999444.726535229.
注:
由于平台和 Python 版本差异,您的结果可能会有所不同。
这里的平台是:
Python 3.13.0
MacOS 15.1.1
Apple M2
嗨修改了你的
fun1
:
import multiprocessing
import time
import random
def fun1( x ):
y = 0
for i in range( len( x ) ):
if 0.3 < x[i] < 0.5:
y = y + x[i]
elif x[i] >= 0.5:
y += x[i]**2
else:
y += 1
return( y )
def fun2( x ):
p = multiprocessing.Pool( 2 )
y1, y2 = p.map( fun1, [ x[ : len( x ) // 2 ], x[ len( x ) // 2 : ] ] )
y = y1 + y2
return( y )
x = [ random.random() for i in range( 10 ** 6 ) ]
st = time.time()
ans = fun1( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )
st = time.time()
ans = fun2( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )
x = [ random.random() for i in range( 10 ** 7 ) ]
st = time.time()
ans = fun1( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )
st = time.time()
ans = fun2( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )
输出,对我来说似乎可以重现(我有一个小CPU):
time = 0.5544326305389404, answer = 671883.5322289083.
time = 0.5338046550750732, answer = 671883.5322288958.
time = 4.955313205718994, answer = 6715366.44807873.
time = 3.6769814491271973, answer = 6715366.448078716.
#anoter try:
time = 0.5013079643249512, answer = 671620.2326088196.
time = 0.39491748809814453, answer = 671620.2326088334.
time = 4.901585578918457, answer = 6717295.294786744.
time = 3.776726484298706, answer = 6717295.294786792.
#next:
time = 0.4863710403442383, answer = 671647.0603678812.
time = 0.6218230724334717, answer = 671647.0603678576.
time = 5.027068376541138, answer = 6716671.888051354.
time = 3.752805471420288, answer = 6716671.888051517.
我认为这与通过Python示例理解时间复杂度
有关但我可能是错的,我也在努力学习。
附录:
根据之前的好答案,该答案已被删除(不知道为什么):
使用
Multiprocessing.Pool().map
与 Python 中的多处理 - 所有关于酸洗
Python 中的多重处理非常灵活。您可以根据需要定义流程并编排它们,或者使用一种优秀的方法来聚集流程池。默认情况下,Pool 假定进程数等于 CPU 核心数,但您可以通过传递进程参数来更改它。 Pool 中包含的主要方法是 apply 和 map,它们分别允许您使用任意参数运行进程或执行并行映射。还有这些的异步版本,即 apply_async 和 map_async.Simple,对吧?是的,这就是所需要的。现在,去使用多处理吧! 事实上,在你离开去追寻你的梦想之前,有一个小警告。当执行进程时,Python 首先 pickles 这些方法。这会产生瓶颈,因为只有 pickle 的对象才会传递给进程。此外,Pool 不允许并行化引用运行它们的 pool 实例的对象........................................ …………………………………… ...................................................... ...................................................... ...................... 是的,这都是因为Pickle。你能用它做什么?从某种意义上说,并不算多,因为它是默认的包含电池的解决方案。另一方面,泡菜通常很慢
删除答案:
您正在复制相当大量的数据(> 90MB),因此
fun2
消耗了大量时间进行数据复制(序列化/反序列化、进程间通信),
>>> import pickle
>>> import random
>>> datea = [random.random() for i in range(10 ** 7)]
>>> len(pickle.dumps(data)
90032371
您可以将数据存储在多个文件中,并将文件名分发给工作人员,而不是直接传递大数据。
您可能希望使用并行计算框架(例如 Polars、Dask 和 Ray),具体取决于您的工作负载。