子进程命令的实时输出

问题描述 投票:143回答:14

我正在使用python脚本作为流体动力学代码的驱动程序。当运行模拟时,我使用subprocess.Popen运行代码,将stdout和stderr的输出收集到subprocess.PIPE ---然后我可以打印(并保存到日志文件)输出信息,并检查任何错误。问题是,我不知道代码是如何进展的。如果我直接从命令行运行它,它会给我输出关于它在什么时间迭代,什么时间,下一个时间步骤是什么等等的输出。

有没有办法存储输出(用于记录和错误检查),还产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初我通过run_command管道tee,以便副本直接进入日志文件,并且流仍然直接输出到终端 - 但这样我不能存储任何错误(对我的知识)。


编辑:

临时解决方案:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端,运行tail -f log.txt(s.t。log_file = 'log.txt')。

python shell logging error-handling subprocess
14个回答
137
投票

有两种方法可以通过从readreadline函数创建迭代器来实现:

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), ''):  # replace '' with b'' for Python 3
        sys.stdout.write(c)
        f.write(c)

要么

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, ''):  # replace '' with b'' for Python 3
        sys.stdout.write(line)
        f.write(line)

或者你可以创建一个reader和一个writer文件。将writer传递给Popen并从reader读取

import io
import time
import subprocess
import sys

filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

这样,您将使用test.log以及标准输出编写数据。

文件方法的唯一优点是您的代码不会阻塞。因此,您可以在此期间做任何您想做的事情,并以非阻塞的方式随时阅读reader。当您使用PIPE时,readreadline函数将阻塞,直到将一个字符写入管道或分别将一行写入管道。


0
投票

除了所有这些答案,一个简单的方法也可以如下:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

只要它是可读的就循环通过可读流,如果它得到一个空结果,则停止它。

这里的关键是readline()返回一条线(最后有\n),只要有一个输出,如果它真的在最后则为空。

希望这有助于某人。


0
投票

与之前的答案类似,但以下解决方案适用于我在Windows上使用Python3提供实时打印和登录的常用方法(getting-realtime-output-using-python):

def print_and_log(command, logFile):
    with open(logFile, 'wb') as f:
        command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)

        while True:
            output = command.stdout.readline()
            if not output and command.poll() is not None:
                f.close()
                break
            if output:
                f.write(output)
                print(str(output.strip(), 'utf-8'), flush=True)
        return command.poll()

0
投票

基于以上所有我建议略微修改版本(python3):

  • while循环调用readline(建议的iter解决方案似乎永远阻止我 - Python 3,Windows 7)
  • 结构化,因此在轮询返回not-qazxsw poi之后,不需要复制读取数据的处理
  • stderr通过管道传输到stdout,因此读取了两个输出输出
  • 添加了代码以获取cmd的退出值。

码:

None

0
投票

我认为import subprocess proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) while True: rd = proc.stdout.readline() print(rd, end='') # and whatever you want to do... if not rd: # EOF returncode = proc.poll() if returncode is not None: break time.sleep(0.1) # cmd closed stdout, but not exited yet # You may want to check on ReturnCode here 方法有点误导:它实际上填充了你在subprocess.communicate中指定的stdout和stderr。

然而,从subprocess.Popen读取你可以提供给subprocess.PIPE的stdout和stderr参数将最终填满OS管道缓冲区并使你的应用程序死锁(特别是如果你有多个进程/线程必须使用subprocess.Popen)。

我建议的解决方案是为stdout和stderr提供文件 - 并读取文件的内容,而不是从死锁subprocess中读取。这些文件可以是PIPE - 也可以在tempfile.NamedTemporaryFile()写入时进行读取。

以下是一个示例用法:

subprocess.communicate

这是源代码,可以使用尽可能多的注释来解释它的作用:

如果您使用的是python 2,请确保首先从pypi安装最新版本的subprocess32软件包。

        try:
            with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

0
投票

没有一个Pythonic解决方案适合我。事实证明, import os import sys import threading import time import tempfile import logging if os.name == 'posix' and sys.version_info[0] < 3: # Support python 2 import subprocess32 as subprocess else: # Get latest and greatest from python 3 import subprocess logger = logging.getLogger(__name__) class ProcessError(Exception): """Base exception for errors related to running the process""" class ProcessTimeout(ProcessError): """Error that will be raised when the process execution will exceed a timeout""" class ProcessRunner(object): def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs): """ Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the Process Runner. This is a class that should be used as a context manager - and that provides an iterator for reading captured output from subprocess.communicate in near realtime. Example usage: try: with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner: for out in process_runner: print(out) catch ProcessError as e: print(e.error_message) raise :param args: same as subprocess.Popen :param env: same as subprocess.Popen :param timeout: same as subprocess.communicate :param bufsize: same as subprocess.Popen :param seconds_to_wait: time to wait between each readline from the temporary file :param kwargs: same as subprocess.Popen """ self._seconds_to_wait = seconds_to_wait self._process_has_timed_out = False self._timeout = timeout self._process_done = False self._std_file_handle = tempfile.NamedTemporaryFile() self._process = subprocess.Popen(args, env=env, bufsize=bufsize, stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs) self._thread = threading.Thread(target=self._run_process) self._thread.daemon = True def __enter__(self): self._thread.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self._thread.join() self._std_file_handle.close() def __iter__(self): # read all output from stdout file that subprocess.communicate fills with open(self._std_file_handle.name, 'r') as stdout: # while process is alive, keep reading data while not self._process_done: out = stdout.readline() out_without_trailing_whitespaces = out.rstrip() if out_without_trailing_whitespaces: # yield stdout data without trailing \n yield out_without_trailing_whitespaces else: # if there is nothing to read, then please wait a tiny little bit time.sleep(self._seconds_to_wait) # this is a hack: terraform seems to write to buffer after process has finished out = stdout.read() if out: yield out if self._process_has_timed_out: raise ProcessTimeout('Process has timed out') if self._process.returncode != 0: raise ProcessError('Process has failed') def _run_process(self): try: # Start gathering information (stdout and stderr) from the opened process self._process.communicate(timeout=self._timeout) # Graceful termination of the opened process self._process.terminate() except subprocess.TimeoutExpired: self._process_has_timed_out = True # Force termination of the opened process self._process.kill() self._process_done = True @property def return_code(self): return self._process.returncode 或类似的可能永远阻止。

因此,我像这样使用proc.stdout.read()

tee

如果您已经在使用subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash') ,这个解决方案很方便。

shell=True捕获整个命令链的成功状态(仅在Bash中可用)。如果我省略了${PIPESTATUS},那么这总是会返回零,因为&& exit ${PIPESTATUS}永远不会失败。

tee可能需要立即将每条线打印到终端,而不是等待太长时间,直到“管道缓冲区”被填满。然而,unbuffer吞下断言的退出状态(SIG Abort)......

qazxsw poi还将stderr记录到文件中。


0
投票

解决方案1:实时同时记录unbuffer2>&1

一个简单的解决方案,实时同时记录stdoutstderr

stdout

解决方案2:创建一个迭代器,逐行同时返回stderrimport subprocess as sp from concurrent.futures import ThreadPoolExecutor def log_popen_pipe(p, pipe_name): while p.poll() is None: line = getattr(p, pipe_name).readline() log_file.write(line) with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p: with ThreadPoolExecutor(2) as pool: r1 = pool.submit(log_popen_pipe, p, 'stdout') r2 = pool.submit(log_popen_pipe, p, 'stderr') r1.result() r2.result()

在这里,我们创建一个函数stdout,允许您同时迭代两个管道(stderr / read_popen_pipes()):

stdout

77
投票

执行摘要(或“tl; dr”版本):当最多只有一个subprocess.PIPE时很容易,否则很难。

可能是时候解释一下subprocess.Popen如何做到这一点。

(警告:这是针对Python 2.x,尽管3.x类似;而且我对Windows变体很模糊。我更了解POSIX的东西。)

Popen函数需要同时处理零到三个I / O流。这些通常表示为stdinstdoutstderr

你可以提供:

  • None,表示您不想重定向流。它将像往常一样继承这些。请注意,至少在POSIX系统上,这并不意味着它将使用Python的sys.stdout,只是Python的实际标准输出;最后看看demo。
  • 一个int值。这是一个“原始”文件描述符(至少在POSIX中)。 (旁注:PIPESTDOUT实际上是内部的ints,但是“不可能”的描述符,-1和-2。)
  • 一个流 - 真的,任何带有fileno方法的对象。 Popen将使用stream.fileno()找到该流的描述符,然后继续使用int值。
  • subprocess.PIPE,表明Python应该创建一个管道。
  • subprocess.STDOUT(仅适用于stderr):告诉Python使用与stdout相同的描述符。这只有在为None提供(非stdout)值时才有意义,即便如此,只有在设置stdout=subprocess.PIPE时才需要它。 (否则你可以提供你为stdout提供的相同参数,例如,Popen(..., stdout=stream, stderr=stream)。)

The easiest cases (no pipes)

如果你没有重定向(将所有三个保留为默认的None值或提供明确的None),Pipe非常容易。它只需要分离子进程并让它运行。或者,如果您重定向到非PIPE-an int或流的fileno()-它仍然很容易,因为操作系统完成所有工作。 Python只需要分离子进程,将其stdin,stdout和/或stderr连接到提供的文件描述符。

The still-easy case: one pipe

如果你只重定向一个流,Pipe仍然很容易。我们一次选择一个流并观看。

假设你想提供一些stdin,但让stdoutstderr去重定向,或转到文件描述符。作为父进程,您的Python程序只需使用write()向管道发送数据。你可以自己做,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

或者你可以将stdin数据传递给proc.communicate(),然后执行上面显示的stdin.write。没有输出回来所以communicate()只有另一个真正的工作:它也为你关闭管道。 (如果不调用proc.communicate(),则必须调用proc.stdin.close()来关闭管道,以便子进程知道没有更多数据通过。)

假设您想要捕获stdout但仅留下stdinstderr。同样,这很简单:只需调用proc.stdout.read()(或等效函数),直到没有更多输出。由于proc.stdout()是一个普通的Python I / O流,因此您可以使用它上面的所有常规构造,例如:

for line in proc.stdout:

或者,再次,你可以使用proc.communicate(),它只是为你做read()

如果你想只捕获stderr,它的工作原理与stdout相同。

在事情变得艰难之前还有一个技巧。假设您想要捕获stdout,并且还捕获stderr但是在与stdout相同的管道上:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在这种情况下,subprocess“作弊”!好吧,它必须这样做,所以它并没有真正作弊:它启动子进程,其stdout和stderr都指向(单个)管道描述符,反馈给它的父(Python)进程。在父端,只有一个管道描述符用于读取输出。所有“stderr”输出都显示在proc.stdout中,如果你调用proc.communicate(),stderr结果(元组中的第二个值)将是None,而不是字符串。

The hard cases: two or more pipes

当您想要使用至少两个管道时,所有问题都会出现。事实上,subprocess代码本身有这个:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,唉,这里我们已经制作了至少两个,也许是三个不同的管道,所以count(None)返回1或0.我们必须以艰难的方式做事。

在Windows上,这使用threading.Thread来累积self.stdoutself.stderr的结果,并让父线程传递self.stdin输入数据(然后关闭管道)。

在POSIX上,这使用poll(如果可用),否则使用select来累积输出并传递stdin输入。所有这些都在(单个)父进程/线程中运行。

这里需要线程或轮询/选择以避免死锁。例如,假设我们已将所有三个流重定向到三个单独的管道。进一步假设在写入过程暂停之前,有多少数据可以填充到管道中,等待读取过程从另一端“清理”管道。我们将这个小限制设置为单个字节,仅用于说明。 (这实际上是如何工作的,除了限制远大于一个字节。)

如果父(Python)进程尝试写几个字节 - 比如'go\n'to proc.stdin,第一个字节进入,然后第二个字节导致Python进程暂停,等待子进程读取第一个字节,清空管道。

同时,假设子进程决定打印一个友好的“你好!不要恐慌!”问候。 H进入它的stdout管道,但是e导致它暂停,等待它的父母读取H,清空stdout管道。

现在我们陷入困境:Python进程处于睡眠状态,等待完成说“go”,子进程也处于睡眠状态,等待完成说“你好!不要恐慌!”。

subprocess.Popen代码通过线程或选择/轮询避免了这个问题。当字节可以越过管道时,它们就会消失。当它们不能时,只有一个线程(不是整个进程)必须休眠 - 或者在select / poll的情况下,Python进程同时等待“可写”或“数据可用”,写入进程的stdin只有当有空间时,才会在数据准备就绪时读取它的标准输出和/或标准输出。 proc.communicate()代码(实际上_communicate处理多毛的情况)返回所有stdin数据(如果有的话)已经发送并且所有stdout和/或stderr数据已经累积。

如果你想在两个不同的管道上读取stdoutstderr(无论任何stdin重定向),你也需要避免死锁。这里的死锁场景是不同的 - 当你从stderr中提取数据时,子进程向stdout写了一些长的东西,反之亦然 - 但它仍然在那里。


The Demo

我答应证明,未重定向的Python subprocesses写入底层标准输出,而不是sys.stdout。所以,这里有一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
    print 'start show1'
    save = sys.stdout
    sys.stdout = StringIO()
    print 'sys.stdout being buffered'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    in_stdout = sys.stdout.getvalue()
    sys.stdout = save
    print 'in buffer:', in_stdout

def show2():
    print 'start show2'
    save = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    print 'after redirect sys.stdout'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    sys.stdout = save

show1()
show2()

运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

请注意,如果添加stdout=sys.stdout,第一个例程将失败,因为StringIO对象没有fileno。如果你添加hello,第二个将省略stdout=sys.stdout,因为sys.stdout已被重定向到os.devnull

(如果重定向Python的文件描述符-1,子进程将遵循该重定向.open(os.devnull, 'w')调用生成一个fileno()大于2的流。)


18
投票

我们也可以使用默认文件迭代器来读取stdout,而不是使用带有readline()的iter构造。

import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
    sys.stdout.write(line)

10
投票

如果你能够使用第三方库,你可以使用像sarge这样的东西(披露:我是它的维护者)。该库允许对来自子进程的输出流进行非阻塞访问 - 它在subprocess模块上分层。


2
投票

一个好但“重量级”的解决方案是使用Twisted - 见底部。

如果你愿意只使用stdout那些东西应该工作:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

(如果你使用read(),它会尝试读取整个“文件”,这是无用的,我们真正可以使用的是读取管道中所有数据的东西)

人们也可能试图通过线程来解决这个问题,例如:

import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

现在我们可以通过两个线程来添加stderr。

但请注意,子进程文档不鼓励直接使用这些文件,并建议使用communicate()(主要关注死锁,我认为这不是上面的问题),并且解决方案有点笨拙,所以看起来子进程模块看起来不是很好工作(也见:http://www.python.org/dev/peps/pep-3145/),我们需要看看别的东西。

更复杂的解决方案是使用Twisted,如下所示:https://twistedmatrix.com/documents/11.1.0/core/howto/process.html

使用Twisted执行此操作的方法是使用reactor.spawnprocess()创建进程并提供ProcessProtocol,然后异步处理输出。 Twisted示例Python代码在这里:https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py


1
投票

看起来行缓冲输出对您有用,在这种情况下,类似下面的内容可能适用。 (警告:它未经测试。)这只会实时给出子进程的标准输出。如果你想实时拥有stderr和stdout,你将不得不用select做一些更复杂的事情。

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
    line = proc.stdout.readline()
    print line
    log_file.write(line + '\n')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
    print line
    log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...

1
投票

为什么不直接将stdout设置为sys.stdout?如果你还需要输出到日志,那么你可以简单地覆盖f的write方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

1
投票

我尝试的所有上述解决方案都无法分离stderr和stdout输出(多个管道)或者当OS管道缓冲区已满时永远被阻塞,这在您运行的命令输出太快时发生(在python上有此警告) poll()子流程手册)。我找到的唯一可靠的方法是通过select,但这只是一个posix解决方案:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()

0
投票

这是我在其中一个项目中使用的课程。它将子进程的输出重定向到日志。起初我尝试简单地覆盖写入方法,但这不起作用,因为子进程永远不会调用它(重定向发生在filedescriptor级别)。所以我使用自己的管道,类似于在子进程模块中完成的管道。这样做的好处是可以将所有日志记录/打印逻辑封装在适配器中,您只需将记录器的实例传递给Popensubprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

如果您不需要日志记录但只是想使用print(),您显然可以删除大部分代码并使类更短。您还可以通过__enter____exit__方法扩展它,并在finished中调用__exit__,以便您可以轻松地将其用作上下文。

© www.soinside.com 2019 - 2024. All rights reserved.