我有一个第 3 方 cli 可执行文件,我需要从我的 python 代码中调用。这些都是繁重的计算(CPU),我需要调用它大约 50-100 次。可执行文件本身在某种程度上是多线程的,但不是所有步骤,而且我有很多可用的核心。
这意味着我想同时运行多个子进程,但不是全部。所以我需要提交其中一些,然后跟踪其中一个完成后,启动一个新的以优化 CPU 使用率。
我有一个工作版本,但它非常幼稚,但它有效。它只是等待第一个提交的进程完成。但它依赖于数据,因此在某些时候,一个运行时间很长的进程是其余进程中提交的“第一个”进程,并且它只会阻止任何其他进程提交,直到该进程完成为止。还有长顺序/IO?阶段,这个长时间运行的阶段可能会占用 1% 的 cpu 数小时,在此期间 cpu 大部分处于空闲状态。
所以我需要优化我的流程提交代码:
num_concurrent_processes = 6
for path in files:
# Building cmd omitted
rs_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
bufsize=1, creationflags=0x00000008)
rs_processes.append(rs_process)
if len(rs_processes) >= num_concurrent_processes:
# wait until at least one process has completed before submitting the next one
continue_polling = True
while continue_polling:
for idx, proc in enumerate(rs_processes):
poll = proc.poll()
if poll is not None:
# process complete
stdout_data, stderr_data = proc.communicate()
# removed some logging in case of error
#remove completed process from list
del rs_processes[idx]
# exit polling loop as a new subprocess can be submitted
continue_polling = False
break
if continue_polling:
# Put some breaks on the polling
time.sleep(10)
如果达到限制,我将阻止提交更多子进程。然后我进行轮询,直到找到一个已完成的进程(根据 python 文档poll 不是 None )。我通过通信获取进程的输出,并对其执行一些操作(记录,未显示)。然后我从“跟踪列表”中删除该进程并退出轮询循环。
但是代码中存在某种逻辑缺陷,因为运行时“陈旧”进程会累积。他们的支持率都为 0%,但不知何故,民意调查并不认为他们完成了任务。一旦出现
num_concurrent_processes
过时的流程,进度就会完全停止。这里出了什么问题?使用 poll() 是否意味着我现在需要手动终止该进程?使用旧方法,所有进程都运行良好并自行停止,我在新方法中使用相同的数据。
如果进程需要写入 stdout 或 stderr,它可能会阻塞,直到您从 Python 中相应的管道中读取数据。因此,某些进程可能实际上正在等待您调用communicate(),但您要等到它们退出后再调用communicate()。
您有以下几种选择: