我使用了 2 个工作池来通过 Python 多处理在函数中分割总和,但时间并没有加快,是我遗漏了什么吗?

问题描述 投票:0回答:2

下面提供了Python代码文件。我在 Linux mint 21.3 上使用 Python 3.10.12(以防需要这些信息)。拥有 2 名工作人员的池比没有任何多重处理的池需要更多时间。我在这里做错了什么?

import multiprocessing
import time
import random

def fun1( x ):
    y = 0
    for i in range( len( x ) ):
        y = y + x[i]
    return( y )

def fun2( x ):
    p = multiprocessing.Pool( 2 )
    y1, y2 = p.map( fun1, [ x[ : len( x ) // 2 ], x[ len( x ) // 2 : ] ] )
    y = y1 + y2
    return( y )

x = [ random.random() for i in range( 10 ** 6 ) ]

st = time.time()
ans = fun1( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )

st = time.time()
ans = fun2( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )

x = [ random.random() for i in range( 10 ** 7 ) ]

st = time.time()
ans = fun1( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )

st = time.time()
ans = fun2( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )

这是我在终端中得到的内容。

time = 0.043381452560424805, answer = 499936.40420325665.
time = 0.1324300765991211, answer = 499936.40420325927.
time = 0.4444568157196045, answer = 5000677.883536603.
time = 0.8388040065765381, answer = 5000677.883536343.

我还在

if __name__ == '__main__':
之后和其余部分之前使用了
fun2
行,我在终端上得到了相同的结果。我还在 Codio 服务器上尝试了 Python 3.6.2。我也有类似的时间。

time = 0.048882484436035156, answer = 499937.07655266096.
time = 0.15220355987548828, answer = 499937.0765526707.
time = 0.4848289489746094, answer = 4999759.127770024.
time = 1.4035391807556152, answer = 4999759.127769606.

我猜我在代码中所做的事情有问题,是对如何使用

multiprocessing.Pool
而不是Python的误解,但想不出是什么。任何帮助将不胜感激。我预计使用两名工作人员可以将速度提高两倍,而不是降低速度。另外,如果需要的话,我检查了
multiprocessing.cpu_count()
,Codio服务器有4个,我的电脑有12个cpu。

python python-3.x parallel-processing multiprocessing python-multiprocessing
2个回答
1
投票

当进行多重处理时,您的列表必须被序列化。这是使用 pickle 实现的。当列表的长度为 10**7 时,列表的内存大小为 89,095,160。序列化对象稍大一些,为 90,032,371。该 90MB 对象被写入磁盘(在某个临时位置),然后在执行函数之前被消耗。这是一个相对缓慢的过程。

您可以通过不序列化到磁盘来稍微改善问题。相反,您可以使用共享内存,尽管这仍然不如线性执行那么快 - 即,当直接调用 fun1()

这是代码的变体,演示了 3 种技术、打印对象大小并输出各种计时:

import multiprocessing
import time
import random
from multiprocessing.shared_memory import SharedMemory
import pickle
import sys


def fun1(x):
    return sum(x)


def fun2(x):
    with multiprocessing.Pool() as p:
        data = [x[: len(x) // 2], x[len(x) // 2 :]]
        return sum(p.map(fun1, data))


def fun3(name):
    shm = SharedMemory(name)
    try:
        x = pickle.loads(shm.buf)
        return fun1(x)
    finally:
        shm.close()


if __name__ == "__main__":
    x = [random.random() for i in range(10**7)]
    size = sys.getsizeof(x)
    print(f"In-memory {size=:,}")
    try:
        smb = pickle.dumps(x)
        size = len(smb)
        print(f"Pickled object {size=:,}")
        shm = SharedMemory(create=True, size=size)
        shm.buf[:size] = smb
        for func in fun1, fun2:
            st = time.perf_counter()
            ans = func(x)
            et = time.perf_counter()
            print(func.__name__, f"time = {et - st}, answer = {ans}.")
        st = time.perf_counter()
        ans = fun3(shm.name)
        et = time.perf_counter()
        print(fun3.__name__, f"time = {et - st}, answer = {ans}.")
    finally:
        shm.close()
        shm.unlink()

输出:

In-memory size=89,095,160
Pickled object size=90,032,371
fun1 time = 0.21793299994897097, answer = 4999444.726535229.
fun2 time = 0.6652624170528725, answer = 4999444.726535082.
fun3 time = 0.4321752089308575, answer = 4999444.726535229.

注:

由于平台和 Python 版本差异,您的结果可能会有所不同。

这里的平台是:

Python 3.13.0
MacOS 15.1.1
Apple M2

0
投票

嗨修改了你的

fun1

import multiprocessing
import time
import random

def fun1( x ):
    y = 0
    for i in range( len( x ) ):
        
        if 0.3 < x[i] < 0.5:
            y = y + x[i]
            
        
        elif x[i] >= 0.5:
            
            y += x[i]**2
            
        else:
            
            y += 1
        
    return( y )

def fun2( x ):
    p = multiprocessing.Pool( 2 )
    y1, y2 = p.map( fun1, [ x[ : len( x ) // 2 ], x[ len( x ) // 2 : ] ] )
    y = y1 + y2
    return( y )

x = [ random.random() for i in range( 10 ** 6 ) ]

st = time.time()
ans = fun1( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )

st = time.time()
ans = fun2( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )

x = [ random.random() for i in range( 10 ** 7 ) ]

st = time.time()
ans = fun1( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )

st = time.time()
ans = fun2( x )
et = time.time()
print( f"time = {et - st}, answer = {ans}." )

输出,对我来说似乎可以重现(我有一个小CPU):

time = 0.5544326305389404, answer = 671883.5322289083.
time = 0.5338046550750732, answer = 671883.5322288958.
time = 4.955313205718994, answer = 6715366.44807873.
time = 3.6769814491271973, answer = 6715366.448078716.

#anoter try:

time = 0.5013079643249512, answer = 671620.2326088196.
time = 0.39491748809814453, answer = 671620.2326088334.
time = 4.901585578918457, answer = 6717295.294786744.
time = 3.776726484298706, answer = 6717295.294786792.

#next:

time = 0.4863710403442383, answer = 671647.0603678812.
time = 0.6218230724334717, answer = 671647.0603678576.
time = 5.027068376541138, answer = 6716671.888051354.
time = 3.752805471420288, answer = 6716671.888051517.

我认为这与通过Python示例理解时间复杂度

有关

enter image description here

但我可能是错的,我也在努力学习。

附录:

根据之前的好答案,该答案已被删除(不知道为什么):

使用

Multiprocessing.Pool().map
Python 中的多处理 - 所有关于酸洗

Python 中的多重处理非常灵活。您可以根据需要定义流程并编排它们,或者使用一种优秀的方法来聚集流程池。默认情况下,Pool 假定进程数等于 CPU 核心数,但您可以通过传递进程参数来更改它。 Pool 中包含的主要方法是 apply 和 map,它们分别允许您使用任意参数运行进程或执行并行映射。还有这些的异步版本,即 apply_async 和 map_async.Simple,对吧?是的,这就是所需要的。现在,去使用多处理吧! 事实上,在你离开去追寻你的梦想之前,有一个小警告。当执行进程时,Python 首先 pickles 这些方法。这会产生瓶颈,因为只有 pickle 的对象才会传递给进程。此外,Pool 不允许并行化引用运行它们的 pool 实例的对象........................................ …………………………………… ...................................................... ...................................................... ...................... 是的,这都是因为Pickle。你能用它做什么?从某种意义上说,并不算多,因为它是默认的包含电池的解决方案。另一方面,泡菜通常很慢

删除答案:

您正在复制相当大量的数据(> 90MB),因此

fun2
消耗了大量时间进行数据复制(序列化/反序列化、进程间通信),

>>> import pickle
>>> import random
>>> datea = [random.random() for i in range(10 ** 7)]
>>> len(pickle.dumps(data)
90032371

您可以将数据存储在多个文件中,并将文件名分发给工作人员,而不是直接传递大数据。

您可能希望使用并行计算框架(例如 Polars、Dask 和 Ray),具体取决于您的工作负载。

© www.soinside.com 2019 - 2024. All rights reserved.