如何在使用“stdin”时流式传输 Python 子进程输出

问题描述 投票:0回答:1

stdin=PIPE
subprocess.Popen()
一起使用时,是否可以在流式传输时读取
stdout
stderr

不使用

stdin=PIPE
时,我已成功使用从
stdout
stderr
读取输出,如此处所述。但是,现在我需要包含
stdin=PIPE
,似乎唯一的选择是等待该过程完成,然后使用从
p.communicate()
返回的元组。?

下面示例中所示的代码在使用

stdin=PIPE
之前可以工作,但现在失败并显示
ValueError: I/O operation on closed file.

这个方法还能用吗,或者是使用

out
err
中的
p.communicate()
的唯一方法?

from subprocess import Popen, PIPE, CalledProcessError, TimeoutExpired


def log_subprocess_output(pipe, func=logger.info) -> None:
    '''Log subprocess output from a pipe.'''

    for line in pipe:
        func(line.decode().rstrip())


try:
    p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, text=True)
    out, err = p.communicate(input=input, timeout=10)

    # This worked before using `stdin=PIPE`.
    # Can this method still be used somehow, or is the only way to use `out` and `err` from `p.communicate()`?
    with p.stdout:
        log_subprocess_output(p.stdout)
    with p.stderr:
        log_subprocess_output(p.stderr, func=logger.error)

except TimeoutExpired as e:
    logger.error(f'My process timed out: {e}')
    p.kill()

except CalledProcessError as e:
    logger.error(f'My process failed: {e}')

except Exception as e:
    logger.error(f'An exception occurred: {e}')
python subprocess
1个回答
0
投票

感谢@Norhther的建议

threading

这是一个基本的工作示例(可能可以改进),它使用

p.communicate()
运行一个进程来传递输入 do
stdin
,而线程读取
stdout
并打印它找到的任何内容。

此示例将

stderr
重定向到
stdout
,而实际上我将有 2 个线程分别查看这些流并记录到相关的日志级别(例如
logging.info()
logging.error()
)。

from subprocess import CalledProcessError, TimeoutExpired, Popen, PIPE, STDOUT
import threading

def invoke_subporcess(args: list, input: str=None, timeout: int=300) -> int:

    def log_subprocess_output(stream) -> None:
        '''Log subprocess output from a pipe.'''
        
        while True:
            line = stream.readline()
            if not line:
                break
            print(f'[INFO] {line.strip()}')

    try:
        # Configure the container image action.
        p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=STDOUT, text=True)

        # Start threads to read stdout and stderr.
        thread1 = threading.Thread(target=log_subprocess_output,
            name='read_stdout', kwargs={'stream': p.stdout})
        thread1.start()

        # Start the container image action.
        p.communicate(input=input, timeout=timeout)

        # Join the threads that read stdout and stderr.
        thread1.join(timeout=timeout)
        
        # Check the exit code and raise an error if necessary.
        if p.returncode > 0:
            raise CalledProcessError(p.returncode, ' '.join(p.args))
        
        return p.returncode
        
    except TimeoutExpired as e:
        print(f'[Error] My process timed out: {e}')
        p.kill()
        p.communicate()

    except CalledProcessError as e:
        print(f'[Error] My process failed: {e}')

    except Exception as e:
        print(f'[Error] An exception occurred: {e}')

if __name__ == "__main__":
    
    password = "super-secret-password"
    url = "my-container-registry.com"
    args = ['docker', 'login', url, '--username', 'AWS', '--password-stdin']

    invoke_subporcess(args, input=password)
© www.soinside.com 2019 - 2024. All rights reserved.