将
stdin=PIPE
与 subprocess.Popen()
一起使用时,是否可以在流式传输时读取 stdout
和 stderr
?
不使用
stdin=PIPE
时,我已成功使用从 stdout
和 stderr
读取输出,如此处所述。但是,现在我需要包含 stdin=PIPE
,似乎唯一的选择是等待该过程完成,然后使用从 p.communicate()
返回的元组。?
下面示例中所示的代码在使用
stdin=PIPE
之前可以工作,但现在失败并显示 ValueError: I/O operation on closed file.
。
这个方法还能用吗,或者是使用
out
和 err
中的 p.communicate()
的唯一方法?
from subprocess import Popen, PIPE, CalledProcessError, TimeoutExpired
def log_subprocess_output(pipe, func=logger.info) -> None:
'''Log subprocess output from a pipe.'''
for line in pipe:
func(line.decode().rstrip())
try:
p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, text=True)
out, err = p.communicate(input=input, timeout=10)
# This worked before using `stdin=PIPE`.
# Can this method still be used somehow, or is the only way to use `out` and `err` from `p.communicate()`?
with p.stdout:
log_subprocess_output(p.stdout)
with p.stderr:
log_subprocess_output(p.stderr, func=logger.error)
except TimeoutExpired as e:
logger.error(f'My process timed out: {e}')
p.kill()
except CalledProcessError as e:
logger.error(f'My process failed: {e}')
except Exception as e:
logger.error(f'An exception occurred: {e}')
感谢@Norhther的建议
threading
这是一个基本的工作示例(可能可以改进),它使用
p.communicate()
运行一个进程来传递输入 do stdin
,而线程读取 stdout
并打印它找到的任何内容。
此示例将
stderr
重定向到 stdout
,而实际上我将有 2 个线程分别查看这些流并记录到相关的日志级别(例如 logging.info()
或 logging.error()
)。
from subprocess import CalledProcessError, TimeoutExpired, Popen, PIPE, STDOUT
import threading
def invoke_subporcess(args: list, input: str=None, timeout: int=300) -> int:
def log_subprocess_output(stream) -> None:
'''Log subprocess output from a pipe.'''
while True:
line = stream.readline()
if not line:
break
print(f'[INFO] {line.strip()}')
try:
# Configure the container image action.
p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=STDOUT, text=True)
# Start threads to read stdout and stderr.
thread1 = threading.Thread(target=log_subprocess_output,
name='read_stdout', kwargs={'stream': p.stdout})
thread1.start()
# Start the container image action.
p.communicate(input=input, timeout=timeout)
# Join the threads that read stdout and stderr.
thread1.join(timeout=timeout)
# Check the exit code and raise an error if necessary.
if p.returncode > 0:
raise CalledProcessError(p.returncode, ' '.join(p.args))
return p.returncode
except TimeoutExpired as e:
print(f'[Error] My process timed out: {e}')
p.kill()
p.communicate()
except CalledProcessError as e:
print(f'[Error] My process failed: {e}')
except Exception as e:
print(f'[Error] An exception occurred: {e}')
if __name__ == "__main__":
password = "super-secret-password"
url = "my-container-registry.com"
args = ['docker', 'login', url, '--username', 'AWS', '--password-stdin']
invoke_subporcess(args, input=password)