Jupyter 笔记本中 Python 子进程的实时标准输出输出

问题描述 投票:0回答:6

我正在使用 subprocess 从 Python (3.5.2) 脚本运行命令行程序,该脚本在 Jupyter 笔记本中运行。子进程需要很长时间才能运行,因此我希望将其标准输出实时打印到 Jupyter 笔记本的屏幕上。

我可以在从终端运行的普通 Python 脚本中毫无问题地做到这一点。我这样做使用:

def run_command(cmd):
from subprocess import Popen, PIPE
import shlex

with Popen(shlex.split(cmd), stdout=PIPE, bufsize=1, universal_newlines=True) as p:
    for line in p.stdout:
        print(line, end='')
    exit_code = p.poll()
return exit_code

但是,当我在 Jupyter 笔记本中运行脚本时,它不会将标准输出实时打印到屏幕上。相反,它会在子进程完成运行后打印所有内容。

有人知道如何解决这个问题吗?

非常感谢, 约翰尼

python subprocess ipython stdout jupyter
6个回答
34
投票

ipython 笔记本有自己的支持运行 shell 命令。如果您不需要使用子流程进行捕获,您可以这样做

cmd = 'ls -l'
!{cmd}

使用 ! 执行的命令的输出自动通过笔记本传输。


4
投票

如果您设置

stdout = None
(这是默认设置,因此您可以完全省略
stdout
参数),那么您的进程应将其输出写入运行 IPython 笔记本服务器的终端。

发生这种情况是因为子进程的默认行为是从父文件处理程序继承(请参阅docs)。

您的代码将如下所示:

from subprocess import Popen, PIPE
import shlex

def run_command(cmd):
    p = Popen(shlex.split(cmd), bufsize=1, universal_newlines=True)
    return p.poll()

这不会打印到浏览器中的笔记本,但至少您可以在其他代码运行时异步看到子进程的输出。

希望这有帮助。


4
投票

Jupyter 搞砸了 stdout 和 stderr。 这应该得到你想要的,并在命令启动失败时给你一个更有用的异常。

import signal
import subprocess as sp


class VerboseCalledProcessError(sp.CalledProcessError):
    def __str__(self):
        if self.returncode and self.returncode < 0:
            try:
                msg = "Command '%s' died with %r." % (
                    self.cmd, signal.Signals(-self.returncode))
            except ValueError:
                msg = "Command '%s' died with unknown signal %d." % (
                    self.cmd, -self.returncode)
        else:
            msg = "Command '%s' returned non-zero exit status %d." % (
                self.cmd, self.returncode)

        return f'{msg}\n' \
               f'Stdout:\n' \
               f'{self.output}\n' \
               f'Stderr:\n' \
               f'{self.stderr}'


def bash(cmd, print_stdout=True, print_stderr=True):
    proc = sp.Popen(cmd, stderr=sp.PIPE, stdout=sp.PIPE, shell=True, universal_newlines=True,
                    executable='/bin/bash')

    all_stdout = []
    all_stderr = []
    while proc.poll() is None:
        for stdout_line in proc.stdout:
            if stdout_line != '':
                if print_stdout:
                    print(stdout_line, end='')
                all_stdout.append(stdout_line)
        for stderr_line in proc.stderr:
            if stderr_line != '':
                if print_stderr:
                    print(stderr_line, end='', file=sys.stderr)
                all_stderr.append(stderr_line)

    stdout_text = ''.join(all_stdout)
    stderr_text = ''.join(all_stderr)
    if proc.wait() != 0:
        raise VerboseCalledProcessError(proc.returncode, cmd, stdout_text, stderr_text)

1
投票

用显式

readline()
调用替换 for 循环对我有用。

from subprocess import Popen, PIPE
import shlex

def run_command(cmd):
    with Popen(shlex.split(cmd), stdout=PIPE, bufsize=1, universal_newlines=True) as p:
        while True:
            line = p.stdout.readline()
            if not line:
                break
            print(line)    
        exit_code = p.poll()
    return exit_code

即使 4 年后,他们的迭代器仍然存在一些问题。


0
投票

使用

subprocess.check_output
功能:

>>> subprocess.check_output(['echo', 'foobar'])
b'foobar\n'

对于 Python 3,您将返回一个可以解码的字节对象:

>>> b=subprocess.check_output(['echo', 'foobar'])
>>> b.decode().strip()

'foobar'


0
投票

如果您想分别处理 stdout 和 stderr,您可以生成两个并发处理它们的线程(在生成输出时实时运行)。这适用于 Jupyter 笔记本以及普通的 Python 解释器/脚本。

改编自我更详细的答案

import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen


def stream_command(
    args,
    *,
    stdout_handler=logging.info,
    stderr_handler=logging.error,
    check=True,
    text=True,
    stdout=PIPE,
    stderr=PIPE,
    **kwargs,
):
    """Mimic subprocess.run, while processing the command output in real time."""
    with (
        Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process,
        ThreadPoolExecutor(2) as pool,  # two threads to handle the (live) streams separately
    ):
        exhaust = partial(deque, maxlen=0)  # collections recipe: exhaust an iterable at C-speed
        exhaust_async = partial(pool.submit, exhaust)  # exhaust non-blocking in a background thread
        exhaust_async(stdout_handler(line[:-1]) for line in process.stdout)
        exhaust_async(stderr_handler(line[:-1]) for line in process.stderr)
    retcode = process.poll()  # block until both iterables are exhausted (process finished)
    if check and retcode:
        raise CalledProcessError(retcode, process.args)
    return CompletedProcess(process.args, retcode)

使用简单的

print
处理程序进行调用:

stream_command(["echo", "test"], stdout_handler=print, stderr_handler=print)
# test

或者使用自定义处理程序:

outs, errs = [], []
def stdout_handler(line):
    outs.append(line)
    print(line)
def stderr_handler(line):
    errs.append(line)
    print(line)

stream_command(
    ["echo", "test"],
    stdout_handler=stdout_handler,
    stderr_handler=stderr_handler,
)
# test
print(outs)
# ['test']
© www.soinside.com 2019 - 2024. All rights reserved.