我有一个称为run_3_processes
的函数,该函数使用multiprocessing.pool.apply
产生3个进程(duh),等待它们的结果并处理这些结果并返回一个结果。
我还有另一个名为run_3_processes_3_times
的函数,该函数应运行run_3_processes
3次并行,等待所有函数返回然后处理所有结果。
我尝试过的事情:
run_3_processes_3_times
的处理池-事实证明这很复杂,因为Python Process Pool non-daemonic?threadpool.apply
用于run_3_processes_3_times
–由于某种原因,使其串行而不是并行运行–是因为apply
中的run_3_processes
阻止了GIL?我确定有一个单一的解决方案,我想念...谢谢!
好吧,找到一个错误的答案,很想听听是否有更好的东西:
def run_3_processes_3_times():
pool = ThreadPool(3)
candidates = [pool.apply_async(run_3_processes,
args=(c)) for c in range(3)]
candidates = [x.get() for x in candidates]
pool.close()
def run_3_processes(c):
pool = mp.Pool(3)
solutions = [pool.apply_async(do_it,
args=(i) for i in range(3)]
solutions = [x.get() for x in solutions]
pool.close()
由于您使用的是真正的线程和子流程的组合,因此您将“有点”运行到GIL中,但是它的结构方式使它看起来不太可能成为问题。 ThreadPool
将受上下文切换的约束,以在线程之间提供并发性,但由于其唯一目的是生成子进程,因此它不会占用大量CPU资源。我不确定为什么还要使用多个线程。我可能只会生成一个单线程的父进程,然后直接等待子进程。
在这两个函数中,使用map()
方法而不是apply_async()
可能更惯用,尽管两者都可以。通常那看起来像这样:
process_count = 3
def pre_process(input_data):
input_subsets = [[]] * process_count
for idx, data_point in enumerate(input_data):
<do any input validation on data_point>
input_subsets[idx % process_count].append(data_point)
return input_subsets
def process_data(input_data):
return_val = []
for val in input_data:
<do some processing work>
return_val.append(<result of processing>)
return return_val
data_subsets = pre_process(raw_data)
pool = mp.Pool(process_count)
result_list = pool.map(process_data, data_subsets)
<check result_list>