我正在尝试使用并发来并行实现任务。请在下面找到一段代码:
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
import concurrent.futures
# num CPUs
cpu_num = len(os.sched_getaffinity(0))
print("Number of cpu available : ",cpu_num)
# max_Worker = cpu_num
max_Worker = 1
# A fake input array
n=1000000
array = list(range(n))
results = []
# A fake function being applied to each element of array
def task(i):
return i**2
x = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_Worker) as executor:
features = {executor.submit(task, j) for j in array}
# the real function is heavy and we need to be sure of completeness of each run
for future in concurrent.futures.as_completed(features):
results.append(future.result())
results = [future.result() for future in features]
y = time.time()
print('=========================================')
print(f"Train data preparation time (s): {(y-x)}")
print('=========================================')
现在是我的问题,
提前致谢,
请参阅我对您问题的评论。对于我在评论中提到的开销,您还需要添加创建进程池本身的开销。
以下是包含多个结果的基准。第一个是调用工作函数
task
100000 次并创建 results
列表并打印出该列表的最后一个元素的时间。显而易见,为什么我将拨打task
的次数从 1000000 减少到 100000。
下一个尝试是使用多重处理来完成同样的事情,使用 ProcessPoolExecutor 和
submit
方法,然后处理返回的 Future
实例。
下一次尝试是使用
map
方法,并使用默认的 chunksize 参数 1。理解这个论点很重要。当 chunksize 值为 1 时,传递给 map
方法的 iterable的每个元素都会作为要由池中的进程处理的块单独写入任务队列。当池进程空闲寻找工作时,它从队列中提取下一个要执行的任务块,处理包含该块的每个任务,然后再次空闲。当通过
map
提交大量已提交任务时,chunksize 值为 1 效率较低。您会期望它的性能相当于对 iterable的每个元素重复发出
submit
调用。
下一次尝试指定一个 chunksize 值,该值或多或少近似于
map
包中的 Pool
类使用的 multiprocessing
函数默认使用的值。正如您所看到的,改进是显着的,但与非多处理情况相比仍然没有改进。
最后的尝试使用包
multiprocessing
及其 multiprocessing.pool.Pool
类提供的多处理设施。此基准测试的不同之处在于,当未指定 chunksize参数时,其
map
函数使用更智能的默认值 chunksize。
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool
# A fake function being applied to each element of array
def task(i):
return i**2
# required for Windows:
if __name__ == '__main__':
n=100000
t1 = time.time()
results = [task(i) for i in range(n)]
print('Non-multiprocessing time:', time.time() - t1, results[-1])
# num CPUs
cpu_num = os.cpu_count()
print("Number of CPUs available: ",cpu_num)
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
futures = [executor.submit(task, i) for i in range(n)]
results = [future.result() for future in futures]
print('Multiprocessing time using submit:', time.time() - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n)))
print('Multiprocessing time using map:', time.time() - t1, results[-1])
t1 = time.time()
chunksize = n // (4 * cpu_num)
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n), chunksize=chunksize))
print(f'Multiprocessing time using map: {time.time() - t1}, chunksize: {chunksize}', results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n))
print('Multiprocessing time using Pool.map:', time.time() - t1, results[-1])
打印:
Non-multiprocessing time: 0.027019739151000977 9999800001
Number of CPUs available: 8
Multiprocessing time using submit: 77.34723353385925 9999800001
Multiprocessing time using map: 79.52981925010681 9999800001
Multiprocessing time using map: 0.30500149726867676, chunksize: 3125 9999800001
Multiprocessing time using Pool.map: 0.2799997329711914 9999800001
更新
以下基准测试使用了
task
的版本,该版本非常占用 CPU 资源,并显示了多处理的优势。对于这个小的 iterable 大小(100),在 Pool.map
情况下强制 chunksize值为 1(默认情况下会计算 chunksize 值为 4),性能似乎稍微更高一些。
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool
# A fake function being applied to each element of array
def task(i):
for _ in range(1_000_000):
result = i ** 2
return result
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, pool_size * 4)
if remainder:
chunksize += 1
return chunksize
# required for Windows:
if __name__ == '__main__':
n = 100
cpu_num = os.cpu_count()
chunksize = compute_chunksize(n, cpu_num)
t1 = time.time()
results = [task(i) for i in range(n)]
t2 = time.time()
print('Non-multiprocessing time:', t2 - t1, results[-1])
# num CPUs
print("Number of CPUs available: ",cpu_num)
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
futures = [executor.submit(task, i) for i in range(n)]
results = [future.result() for future in futures]
t2 = time.time()
print('Multiprocessing time using submit:', t2 - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n)))
t2 = time.time()
print('Multiprocessing time using map:', t2 - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n), chunksize=chunksize))
t2 = time.time()
print(f'Multiprocessing time using map: {t2 - t1}, chunksize: {chunksize}', results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n))
t2 = time.time()
print('Multiprocessing time using Pool.map:', t2 - t1, results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n), chunksize=1)
t2 = time.time()
print('Multiprocessing time using Pool.map (chunksize=1):', t2 - t1, results[-1])
打印:
Non-multiprocessing time: 23.12758779525757 9801
Number of CPUs available: 8
Multiprocessing time using submit: 5.336004018783569 9801
Multiprocessing time using map: 5.364996671676636 9801
Multiprocessing time using map: 5.444890975952148, chunksize: 4 9801
Multiprocessing time using Pool.map: 5.400001287460327 9801
Multiprocessing time using Pool.map (chunksize=1): 4.698001146316528 9801