使用并发的线程池执行器:对于不同数量的工作线程没有任何改进

问题描述 投票:0回答:1

我正在尝试使用并发来并行实现任务。请在下面找到一段代码:

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
import concurrent.futures

# num CPUs
cpu_num = len(os.sched_getaffinity(0))
print("Number of cpu available : ",cpu_num)

# max_Worker = cpu_num
max_Worker = 1

# A fake input array
n=1000000
array = list(range(n))
results = []

# A fake function being applied to each element of array 
def task(i):
  return i**2 

x = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_Worker) as executor:
  features = {executor.submit(task, j) for j in array}

  # the real function is heavy and we need to be sure of completeness of each run
  for future in concurrent.futures.as_completed(features):
    results.append(future.result())
      
results = [future.result() for future in features]
y = time.time()

print('=========================================')
print(f"Train data preparation time (s): {(y-x)}")
print('=========================================')

现在是我的问题,

  1. 虽然没有错误,但是是否正确/优化过?
  2. 在玩弄工人数量时,似乎没有 速度提高(例如,1 比 16,没有区别)。然后, 有什么问题以及如何解决?

提前致谢,

python python-3.x multithreading pool concurrent.futures
1个回答
2
投票

请参阅我对您问题的评论。对于我在评论中提到的开销,您还需要添加创建进程池本身的开销。

以下是包含多个结果的基准。第一个是调用工作函数

task
100000 次并创建
results
列表并打印出该列表的最后一个元素的时间。显而易见,为什么我将拨打
task
的次数从 1000000 减少到 100000。

下一个尝试是使用多重处理来完成同样的事情,使用 ProcessPoolExecutor 和

submit
方法,然后处理返回的
Future
实例。

下一次尝试是使用

map
方法,并使用默认的 chunksize 参数 1。理解这个论点很重要。当 chunksize 值为 1 时,传递给 map 方法的
iterable
的每个元素都会作为要由池中的进程处理的块单独写入任务队列。当池进程空闲寻找工作时,它从队列中提取下一个要执行的任务块,处理包含该块的每个任务,然后再次空闲。当通过
map
提交大量已提交任务时,chunksize 值为 1 效率较低。您会期望它的性能相当于对
iterable
的每个元素重复发出 submit 调用。

下一次尝试指定一个 chunksize 值,该值或多或少近似于

map
包中的
Pool
类使用的
multiprocessing
函数默认使用的值。正如您所看到的,改进是显着的,但与非多处理情况相比仍然没有改进。

最后的尝试使用包

multiprocessing
及其
multiprocessing.pool.Pool
类提供的多处理设施。此基准测试的不同之处在于,当未指定
chunksize
参数时,其 map 函数使用更智能的默认值 chunksize

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool

# A fake function being applied to each element of array
def task(i):
  return i**2

# required for Windows:
if __name__ == '__main__':
    n=100000

    t1 = time.time()
    results = [task(i) for i in range(n)]
    print('Non-multiprocessing time:', time.time() - t1, results[-1])

    # num CPUs
    cpu_num = os.cpu_count()
    print("Number of CPUs available: ",cpu_num)

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        futures = [executor.submit(task, i) for i in range(n)]
        results = [future.result() for future in futures]
    print('Multiprocessing time using submit:', time.time() - t1,  results[-1])

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n)))
    print('Multiprocessing time using map:', time.time() - t1, results[-1])

    t1 = time.time()
    chunksize = n // (4 * cpu_num)
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n), chunksize=chunksize))
    print(f'Multiprocessing time using map: {time.time() - t1}, chunksize: {chunksize}', results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n))
    print('Multiprocessing time using Pool.map:', time.time() - t1, results[-1])

打印:

Non-multiprocessing time: 0.027019739151000977 9999800001
Number of CPUs available:  8
Multiprocessing time using submit: 77.34723353385925 9999800001
Multiprocessing time using map: 79.52981925010681 9999800001
Multiprocessing time using map: 0.30500149726867676, chunksize: 3125 9999800001
Multiprocessing time using Pool.map: 0.2799997329711914 9999800001

更新

以下基准测试使用了

task
的版本,该版本非常占用 CPU 资源,并显示了多处理的优势。对于这个小的 iterable 大小(100),在 Pool.map 情况下强制
chunksize
值为 1(默认情况下会计算 chunksize 值为 4),性能似乎稍微更高一些。

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool

# A fake function being applied to each element of array
def task(i):
    for _ in range(1_000_000):
        result = i ** 2
    return result

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, pool_size * 4)
    if remainder:
        chunksize += 1
    return chunksize

# required for Windows:
if __name__ == '__main__':
    n = 100
    cpu_num = os.cpu_count()
    chunksize = compute_chunksize(n, cpu_num)

    t1 = time.time()
    results = [task(i) for i in range(n)]
    t2 = time.time()
    print('Non-multiprocessing time:', t2 - t1, results[-1])

    # num CPUs
    print("Number of CPUs available: ",cpu_num)

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        futures = [executor.submit(task, i) for i in range(n)]
        results = [future.result() for future in futures]
        t2 = time.time()
    print('Multiprocessing time using submit:', t2 - t1,  results[-1])

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n)))
        t2 = time.time()
    print('Multiprocessing time using map:', t2 - t1, results[-1])

    t1 = time.time()

    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n), chunksize=chunksize))
        t2 = time.time()
    print(f'Multiprocessing time using map: {t2 - t1}, chunksize: {chunksize}', results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n))
        t2 = time.time()
    print('Multiprocessing time using Pool.map:', t2 - t1, results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n), chunksize=1)
        t2 = time.time()
    print('Multiprocessing time using Pool.map (chunksize=1):', t2 - t1, results[-1])

打印:

Non-multiprocessing time: 23.12758779525757 9801
Number of CPUs available:  8
Multiprocessing time using submit: 5.336004018783569 9801
Multiprocessing time using map: 5.364996671676636 9801
Multiprocessing time using map: 5.444890975952148, chunksize: 4 9801
Multiprocessing time using Pool.map: 5.400001287460327 9801
Multiprocessing time using Pool.map (chunksize=1): 4.698001146316528 9801
© www.soinside.com 2019 - 2024. All rights reserved.