我需要运行数十个计算密集型 CPU 密集型并行任务。目前我使用
joblib
delayed
和 Parallel
:
resultTuples = Parallel(n_jobs=-1, prefer="processes")(delayed(RunSingleTask)(*p) for p in run_params)
它工作正常,但我必须等到所有任务完成才能获取结果列表并在之后处理它们。我希望在准备好处理每个完成的任务后立即获取其结果。
任务没有 IO,所以我看不出使用异步东西有任何目的:
for first_completed in asyncio.as_completed(tasks):
...
那么我该怎么做呢?
您可以使用
ProcessPoolExecutor()
和 as_completed()
:
from concurrent.futures import ProcessPoolExecutor, as_completed
def _sum(a, b):
return a + b
if __name__ == '__main__':
inputs = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
with ProcessPoolExecutor() as executor:
fts = [executor.submit(_sum, a, b) for a, b in inputs]
for f in as_completed(fts):
print(f.result())
3
7
11
15
19