如何在Python中使用子进程获取实时输出

问题描述 投票:0回答:3

我正在尝试运行一个 python 文件来打印一些内容,等待 2 秒,然后再次打印。我想从我的 python 脚本中实时捕获这些输出,然后对其进行处理。我尝试了不同的方法,但没有任何效果。

process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
while True:
    output = process.stdout.readline()
    if process.poll() is not None and output == '':
        break
    if output:
        print(output.strip())

我现在处于这一点,但它不起作用。它会等待代码完成,然后打印所有输出。

我只需要运行一个 python 文件并从中获取实时输出,如果您有其他想法,而不使用打印功能,请告诉我,只需知道我必须单独运行该文件即可。我只是想到了最简单的方法,但据我所知,这是不可能完成的。

python subprocess stdout
3个回答
2
投票

这是我用于相同目的的代码:

def run_command(command, **kwargs):
    """Run a command while printing the live output"""
    process = subprocess.Popen(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        **kwargs,
    )
    while True:   # Could be more pythonic with := in Python3.8+
        line = process.stdout.readline()
        if not line and process.poll() is not None:
            break
        print(line.decode(), end='')

使用示例如下:

run_command(['git', 'status'], cwd=Path(__file__).parent.absolute())

1
投票

这里有三层缓冲,您需要限制所有三层以保证您获得实时数据:

  1. 使用

    stdbuf
    命令(在 Linux 上)包装
    subprocess
    执行(例如运行
    ['stdbuf', '-oL'] + cmd
    而不是仅运行
    cmd
    ),或者(如果您有能力这样做)将程序本身更改为显式更改
    stdout
    上的缓冲(例如,对 C/C++ 代码使用
    setvbuf
    stdout
    全局切换到行缓冲模式,而不是输出到非 tty 时使用的默认块缓冲)或插入关键输出后的刷新语句(例如 C/C++ 的
    fflush(stdout);
    ,Python 的
    fileobj.flush()
    等)将程序缓冲为面向行模式(或添加 fflush);如果没有这个,一切都会卡在子进程的用户模式缓冲区中。

  2. bufsize=0
    添加到
    Popen
    参数(可能不需要,因为您不向标准输入发送任何内容,但无害),以便它取消缓冲所有管道句柄。如果
    Popen
    处于
    text=True
    模式,请切换到
    bufsize=1
    (行缓冲,而不是无缓冲)。

  3. flush=True
    添加到
    print
    参数(如果您连接到终端,行缓冲将为您刷新它,因此只有当 stdout 通过管道传输到文件时这才重要),或显式致电
    sys.stdout.flush()

在这三者之间,您应该能够保证没有数据在用户模式缓冲区中等待;如果子流程至少输出了一行,它会立即到达您,并且由它触发的任何输出也会立即出现。在大多数情况下,第 1 项是最难的(当您无法使用

stdbuf
,或者进程在内部重新配置其自己的缓冲并撤消
stdbuf
的效果,并且您无法修改进程可执行文件来修复它时) ;您可以完全控制 #2 和 #3,但 #1 可能超出您的控制范围。


0
投票

如果您想分别处理 stdout 和 stderr,您可以生成两个并发处理它们的线程(在生成输出时实时运行)。

改编自我更详细的答案

import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen


def stream_command(
    args,
    *,
    stdout_handler=logging.info,
    stderr_handler=logging.error,
    check=True,
    text=True,
    stdout=PIPE,
    stderr=PIPE,
    **kwargs,
):
    """Mimic subprocess.run, while processing the command output in real time."""
    with (
        Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process,
        ThreadPoolExecutor(2) as pool,  # two threads to handle the (live) streams separately
    ):
        exhaust = partial(deque, maxlen=0)  # collections recipe: exhaust an iterable at C-speed
        exhaust_async = partial(pool.submit, exhaust)  # exhaust non-blocking in a background thread
        exhaust_async(stdout_handler(line[:-1]) for line in process.stdout)
        exhaust_async(stderr_handler(line[:-1]) for line in process.stderr)
    retcode = process.poll()  # block until both iterables are exhausted (process finished)
    if check and retcode:
        raise CalledProcessError(retcode, process.args)
    return CompletedProcess(process.args, retcode)

使用简单的

print
处理程序进行调用:

stream_command(["echo", "test"], stdout_handler=print, stderr_handler=print)
# test

或者使用自定义处理程序:

outs, errs = [], []
def stdout_handler(line):
    outs.append(line)
    print(line)
def stderr_handler(line):
    errs.append(line)
    print(line)

stream_command(
    ["echo", "test"],
    stdout_handler=stdout_handler,
    stderr_handler=stderr_handler,
)
# test
print(outs)
# ['test']
© www.soinside.com 2019 - 2024. All rights reserved.