Python:提交和跟踪许多子流程会导致子流程“卡住”

问题描述 投票:0回答:1

我有一个第 3 方 cli 可执行文件,我需要从我的 python 代码中调用。这些都是繁重的计算(CPU),我需要调用它大约 50-100 次。可执行文件本身在某种程度上是多线程的,但不是所有步骤,而且我有很多可用的核心。

这意味着我想同时运行多个子进程,但不是全部。所以我需要提交其中一些,然后跟踪其中一个完成后,启动一个新的以优化 CPU 使用率。

我有一个工作版本,但它非常幼稚,但它有效。它只是等待第一个提交的进程完成。但它依赖于数据,因此在某些时候,一个运行时间很长的进程是其余进程中提交的“第一个”进程,并且它只会阻止任何其他进程提交,直到该进程完成为止。还有长顺序/IO?阶段,这个长时间运行的阶段可能会占用 1% 的 cpu 数小时,在此期间 cpu 大部分处于空闲状态。

所以我需要优化我的流程提交代码:

num_concurrent_processes = 6
for path in files:
    # Building cmd omitted
    rs_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
                                                  bufsize=1, creationflags=0x00000008)
    rs_processes.append(rs_process) 

    if len(rs_processes) >= num_concurrent_processes:
        # wait until at least one process has completed before submitting the next one
        continue_polling = True
        while continue_polling:
            for idx, proc in enumerate(rs_processes):
                poll = proc.poll()
                if poll is not None:
                    # process complete
                    stdout_data, stderr_data = proc.communicate()
                    
                    # removed some logging in case of error
                    
                    #remove completed process from list
                    del rs_processes[idx]
                    # exit polling loop as a new subprocess can be submitted
                    continue_polling = False
                    break
            if continue_polling:
                # Put some breaks on the polling
                time.sleep(10)

如果达到限制,我将阻止提交更多子进程。然后我进行轮询,直到找到一个已完成的进程(根据 python 文档poll 不是 None )。我通过通信获取进程的输出,并对其执行一些操作(记录,未显示)。然后我从“跟踪列表”中删除该进程并退出轮询循环。

但是代码中存在某种逻辑缺陷,因为运行时“陈旧”进程会累积。他们的支持率都为 0%,但不知何故,民意调查并不认为他们完成了任务。一旦出现

num_concurrent_processes
过时的流程,进度就会完全停止。这里出了什么问题?使用 poll() 是否意味着我现在需要手动终止该进程?使用旧方法,所有进程都运行良好并自行停止,我在新方法中使用相同的数据。

python popen
1个回答
0
投票

如果进程需要写入 stdout 或 stderr,它可能会阻塞,直到您从 Python 中相应的管道中读取数据。因此,某些进程可能实际上正在等待您调用communicate(),但您要等到它们退出后再调用communicate()。

您有以下几种选择:

  1. Python asyncio 提供使用异步编程并行监控多个子进程的功能;有关更多详细信息和示例,请参阅 https://docs.python.org/3/library/asyncio-subprocess.html。您可以使用信号量来控制活动子进程的数量。
  2. 您可以生成一个线程来处理每个子进程。线程可以彼此独立地阻塞,因此每个子进程处理线程可以简单地调用 .communicate。
  3. 您可以使用具有固定数量工作线程的 multiprocessing.pool.ThreadPool。对于您的特定用例,这可能是最直接的方法。构造一个 ThreadPool,将工作线程数设置为要并行运行的进程数,然后只需调用 pool.map 或类似函数来启动作业。
© www.soinside.com 2019 - 2024. All rights reserved.