来自 Python 子进程的实时输出/流

问题描述 投票:0回答:4

我正在使用Python及其子进程库来使用strace检查调用的输出,事情是这样的:

subprocess.check_output(["strace", str(processname)]) 

但是,这只会在被调用的子进程完成之后给出输出,这对我的用例来说非常有限。 我需要进程的一种“流”或实时输出,因此我需要在进程仍在运行时读取输出,而不是仅在进程完成后读取输出。

有没有一种方便的方法来使用子流程库来实现这一点? 我正在考虑每 x 秒进行一次轮询,但没有找到有关如何在文档中实现此操作的任何提示。 提前非常感谢。

从 Python 3.2 开始(当上下文管理器支持添加到

Popen
python stream console subprocess
4个回答
18
投票

import subprocess def run(args): with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as process: for line in process.stdout: print(line.decode('utf8'))

    

在引用
选定的答案

9
投票

import subprocess from time import sleep def stream_process(process): go = process.poll() is None for line in process.stdout: print(line) return go process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while stream_process(process): sleep(0.1)

根据
文档

5
投票

Popen.poll()

检查子进程是否已终止。设置并返回 returncode 属性。

所以基于此你可以:

process = subprocess.Popen('your_command_here',stdout=subprocess.PIPE) while True: output = process.stdout.readline() if process.poll() is not None and output == '': break if output: print (output.strip()) retval = process.poll()

这将循环读取标准输出,并实时显示输出。

这在当前版本的 python 中不起作用。 (至少)对于 Python 3.8.5 及更高版本,您应该将 

output == ''

替换为
output == b''

    
如果您想分别处理 stdout 和 stderr,您可以生成两个并发处理它们的线程(在生成输出时实时运行)。


0
投票
答案

import logging from collections import deque from concurrent.futures import ThreadPoolExecutor from functools import partial from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen def stream_command( args, *, stdout_handler=logging.info, stderr_handler=logging.error, check=True, text=True, stdout=PIPE, stderr=PIPE, **kwargs, ): """Mimic subprocess.run, while processing the command output in real time.""" with ( Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process, ThreadPoolExecutor(2) as pool, # two threads to handle the (live) streams separately ): exhaust = partial(deque, maxlen=0) # collections recipe: exhaust an iterable at C-speed exhaust_async = partial(pool.submit, exhaust) # exhaust non-blocking in a background thread exhaust_async(stdout_handler(line[:-1]) for line in process.stdout) exhaust_async(stderr_handler(line[:-1]) for line in process.stderr) retcode = process.poll() # block until both iterables are exhausted (process finished) if check and retcode: raise CalledProcessError(retcode, process.args) return CompletedProcess(process.args, retcode) 使用简单的

print
 处理程序进行调用:

stream_command(["echo", "test"], stdout_handler=print, stderr_handler=print) # test

或者使用自定义处理程序:

outs, errs = [], []
def stdout_handler(line):
    outs.append(line)
    print(line)
def stderr_handler(line):
    errs.append(line)
    print(line)

stream_command(
    ["echo", "test"],
    stdout_handler=stdout_handler,
    stderr_handler=stderr_handler,
)
# test
print(outs)
# ['test']

© www.soinside.com 2019 - 2024. All rights reserved.