我正在尝试运行一个 python 文件来打印一些内容,等待 2 秒,然后再次打印。我想从我的 python 脚本中实时捕获这些输出,然后对其进行处理。我尝试了不同的方法,但没有任何效果。
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
if process.poll() is not None and output == '':
break
if output:
print(output.strip())
我现在处于这一点,但它不起作用。它会等待代码完成,然后打印所有输出。
我只需要运行一个 python 文件并从中获取实时输出,如果您有其他想法,而不使用打印功能,请告诉我,只需知道我必须单独运行该文件即可。我只是想到了最简单的方法,但据我所知,这是不可能完成的。
这是我用于相同目的的代码:
def run_command(command, **kwargs):
"""Run a command while printing the live output"""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
**kwargs,
)
while True: # Could be more pythonic with := in Python3.8+
line = process.stdout.readline()
if not line and process.poll() is not None:
break
print(line.decode(), end='')
使用示例如下:
run_command(['git', 'status'], cwd=Path(__file__).parent.absolute())
这里有三层缓冲,您需要限制所有三层以保证您获得实时数据:
使用
stdbuf
命令(在 Linux 上)包装 subprocess
执行(例如运行 ['stdbuf', '-oL'] + cmd
而不是仅运行 cmd
),或者(如果您有能力这样做)将程序本身更改为显式更改 stdout
上的缓冲(例如,对 C/C++ 代码使用 setvbuf
将 stdout
全局切换到行缓冲模式,而不是输出到非 tty 时使用的默认块缓冲)或插入关键输出后的刷新语句(例如 C/C++ 的 fflush(stdout);
,Python 的 fileobj.flush()
等)将程序缓冲为面向行模式(或添加 fflush);如果没有这个,一切都会卡在子进程的用户模式缓冲区中。
将
bufsize=0
添加到 Popen
参数(可能不需要,因为您不向标准输入发送任何内容,但无害),以便它取消缓冲所有管道句柄。如果 Popen
处于 text=True
模式,请切换到 bufsize=1
(行缓冲,而不是无缓冲)。
将
flush=True
添加到 print
参数(如果您连接到终端,行缓冲将为您刷新它,因此只有当 stdout 通过管道传输到文件时这才重要),或显式致电sys.stdout.flush()
。
在这三者之间,您应该能够保证没有数据在用户模式缓冲区中等待;如果子流程至少输出了一行,它会立即到达您,并且由它触发的任何输出也会立即出现。在大多数情况下,第 1 项是最难的(当您无法使用
stdbuf
,或者进程在内部重新配置其自己的缓冲并撤消 stdbuf
的效果,并且您无法修改进程可执行文件来修复它时) ;您可以完全控制 #2 和 #3,但 #1 可能超出您的控制范围。
如果您想分别处理 stdout 和 stderr,您可以生成两个并发处理它们的线程(在生成输出时实时运行)。
改编自我更详细的答案:
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
stdout_handler=logging.info,
stderr_handler=logging.error,
check=True,
text=True,
stdout=PIPE,
stderr=PIPE,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with (
Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process,
ThreadPoolExecutor(2) as pool, # two threads to handle the (live) streams separately
):
exhaust = partial(deque, maxlen=0) # collections recipe: exhaust an iterable at C-speed
exhaust_async = partial(pool.submit, exhaust) # exhaust non-blocking in a background thread
exhaust_async(stdout_handler(line[:-1]) for line in process.stdout)
exhaust_async(stderr_handler(line[:-1]) for line in process.stderr)
retcode = process.poll() # block until both iterables are exhausted (process finished)
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode)
使用简单的
print
处理程序进行调用:
stream_command(["echo", "test"], stdout_handler=print, stderr_handler=print)
# test
或者使用自定义处理程序:
outs, errs = [], []
def stdout_handler(line):
outs.append(line)
print(line)
def stderr_handler(line):
errs.append(line)
print(line)
stream_command(
["echo", "test"],
stdout_handler=stdout_handler,
stderr_handler=stderr_handler,
)
# test
print(outs)
# ['test']