需求:
Python 3 subprocess 模块内置了 timeout ,我自己也尝试过使用计时器和线程实现超时,但它不适用于输出。
readline()
是否阻塞? readlines()
肯定是在等待进程结束才吐出所有输出,这不是我需要的(我需要持续)。
我即将切换到 Node.js :-(
我会使用 asyncio 来完成此类任务。
从进程中读取 IO,如下接受的 anwser 所示: 如何使用 asyncio 从子进程流式传输 stdout/stderr,并在之后获取其退出代码?
(我不想在这里完全复制)
将其包裹在超时中:
async def killer(trans, timeout):
await asyncio.sleep(timeout)
trans.kill()
print ('killed!!')
trans, *other_stuff = loop.run_until_complete(
loop.subprocess_exec(
SubprocessProtocol, 'py', '-3', '-c', 'import time; time.sleep(6); print("Yay!")' ,
)
)
asyncio.ensure_future(killer(trans, 5)) # 5 seconds timeout for the kill
loop.run_forever()
玩得开心...
使用下面的 2 个 python 脚本。
Master.py将使用
Popen
启动一个新进程,并将启动一个观察者线程,该线程将在3.0
秒后终止该进程。
如果写入
stdout
的数据中没有换行符,从机必须调用flush方法(在Windows上'\n'
也会导致刷新)。
小心
模块不是高精度定时器。time
在极端情况下(从具有 USB 1.0 的闪存驱动器读取可执行文件),进程的加载时间可能会超过 3.0 秒
大师.py
import subprocess, threading, time
def watcher(proc, delay):
time.sleep(delay)
proc.kill()
proc = subprocess.Popen('python Slave.py', stdout = subprocess.PIPE)
threading.Thread(target = watcher, args = (proc, 3.0)).start()
data = bytearray()
while proc:
chunk = proc.stdout.read(1)
if not chunk:
break
data.extend(chunk)
print(data)
从属.py
import time, sys
while True:
time.sleep(0.1)
sys.stdout.write('aaaa')
sys.stdout.flush()
subprocess.run()
与 capture_output=True
和 timeout=<your_timeout>
一起使用。如果命令在 <your_timetout>
秒过去之前没有返回,它将终止进程并引发 subprocess.TimeoutExpired
异常,该异常将具有 .stdout
和 .stderr
属性:
import subprocess
try:
result = subprocess.run(["sleep", "3"], timeout=2, capture_output=True)
except subprocess.TimeoutExpired as e:
print("process timed out")
print(e.stdout)
print(e.stderr)
您可能还想传递
text=True
(或 Python 上的 universal_newlines=True
<3.7) so that stdout
和 stderr
是 str
而不是 bytes
。
在旧版本的 Python 上,您需要在调用
capture_output=True
时将 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
替换为 subprocess.run()
,其余部分应该相同。
编辑:这不是你想要的,因为你需要等待进程终止才能读取输出,但这正是我遇到这个问题时想要的。
我刚刚发布了这篇文章,其中包含一个测试应用程序运行程序的示例,该运行程序等待应用程序的输出(无论该输出是在
stdout
还是stderr
上)并在超时时引发异常。
示例测试应用程序:
# app/test_app.py
import os
import sys
import time
APP_READY_STRING = os.environ.get("APP_READY_STRING", "No 'App Ready' string provided")
POST_STARTUP_RUN_TIME = 5
STARTUP_DELAY = 5
def main():
# log to stdout and stderr in order to test that both can be captured
print("Hello World!")
print("And hello again to you, sir!", file=sys.stderr)
# simulate startup delay
for i in range(STARTUP_DELAY):
print(f"{time.strftime('%H:%M:%S')} Test app waiting... {i+1}")
time.sleep(1)
# print out the string that's being tested for. it should not matter whether
# this is printed to stdout or stderr
print(APP_READY_STRING, flush=True, file=sys.stderr)
# the app should run for 5 seconds before exiting, this will give enough time
# to test that killing the app works
time.sleep(POST_STARTUP_RUN_TIME)
print("Goodbye World!")
if __name__ == "__main__":
main()
app_runner
模块:
# app_runner/app_runner.py
import os
import subprocess
import sys
from .processes import kill_process, wait_for_process_output
APP_READY_STRING = "App started successfully..."
class AppRunner(object):
def __init__(self, app: str, cwd: str):
"""This class is used to run an app in a separate process.
Args:
app (str): The name of the app to run.
cwd (str): The path where the app is located."""
self.app = app
env = {
**os.environ,
"APP_READY_STRING": APP_READY_STRING,
}
cmd = [
sys.executable,
self.app,
]
# start the app in a separate process. it's important that the stdout and
# stderr streams are captured so that they can be checked for the expected
# output, and that the app isn't run with a shell
self.process = subprocess.Popen(
cmd,
cwd=cwd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
wait_for_process_output(self.process, APP_READY_STRING)
except Exception as e:
raise Exception(
f"AppRunner app '{self.app}' failed to start", e
)
def __enter__(self):
return self
def __exit__(self, *args):
kill_process(self.app)
processes
模块:
# app_runner/processes.py
import random
import string
import sys
import threading
import time
from typing import List, Union
import psutil
DEFAULT_APP_WAIT_TIME_SEC = 8
# This dictionary is used to store the state of the processes that are being
# searched.
processes = {}
def _is_process_match(command: str, process_names: List[str]) -> bool:
"""Identifying a process by its command line is not an exact science."""
if len(process_names) == 1:
command_parts = command.split(" ")
if command_parts[0] == process_names[0]:
return True
if len(process_names) > 1 and all(
[process_name in command for process_name in process_names]
):
return True
return False
def _find_in_stream(stream, text: str, process_handle: str) -> None:
while (
not processes[process_handle]["text_found"].is_set()
and not processes[process_handle]["timed_out"].is_set()
):
try:
line = stream.readline().decode("utf-8")
if text in line:
processes[process_handle]["text_found"].set()
_print_log(line)
except Exception:
pass
def _print_log(line: str) -> None:
line = f"{time.strftime('%H:%M:%S')} {line}\n"
sys.stderr.write(line)
sys.stderr.flush()
def _print_process_identifier(proc_name: str, cmd_line: str, process_names: List[str]):
return f"process '{proc_name}' (looking for {','.join(process_names)}) with command line '{cmd_line}'"
def _process_timeout(process_handle, timeout=DEFAULT_APP_WAIT_TIME_SEC) -> bool:
_print_log(
f"Waiting up to {timeout} seconds to abort search on process {process_handle}..."
)
timeout_remaining = timeout
while (
timeout_remaining > 0 and not processes[process_handle]["text_found"].is_set()
):
time.sleep(1)
timeout_remaining -= 1
if not processes[process_handle]["text_found"].is_set():
processes[process_handle]["timed_out"].set()
def _random_string(length: int) -> str:
"""Naive random string generator to create process identifiers."""
return "".join(random.choice(string.ascii_lowercase) for _ in range(length))
def wait_for_process_output(
process, text: str, timeout=DEFAULT_APP_WAIT_TIME_SEC
) -> None:
"""This function checks if the given text is in the process output within the given time limit."""
start_time = time.time()
process_handle = _random_string(10)
processes[process_handle] = {
"text_found": threading.Event(),
"timed_out": threading.Event(),
}
# start a new thread to stop searching after the timeout
threading.Thread(target=_process_timeout, args=(process_handle, timeout)).start()
# search for the text in the stdout and stderr streams
threading.Thread(
target=_find_in_stream, args=(process.stdout, text, process_handle)
).start()
threading.Thread(
target=_find_in_stream, args=(process.stderr, text, process_handle)
).start()
while True:
if processes[process_handle]["text_found"].is_set():
return
if processes[process_handle]["timed_out"].is_set():
raise Exception(
f"Failed to find '{text}' in process output after {time.time() - start_time} seconds."
)
def kill_process(process_names: Union[str, List[str]]) -> None:
"""Kill a Python process identified by the given name or list of names.
There are easier ways to do this, but this is the most reliable way to kill a
Python-run process without knowing the exact command line arguments and without
killing the current process / test runner process (eg. nox).
"""
if isinstance(process_names, str):
process_names = [process_names]
proc_name = "undefined"
cmd_line = "undefined"
# Kill all processes with the given name
for proc in psutil.process_iter(attrs=["pid", "name", "cmdline"], ad_value=None):
try:
proc_name = proc.name()
if proc.status() == psutil.STATUS_ZOMBIE:
continue
# Some apps run under their own names, some as `Python` (this also
# depends on the OS)
if _is_process_match(proc_name, process_names):
print(f"Killing process with name {proc_name}...")
proc.kill()
elif proc_name.lower().startswith("python"):
# drop the first argument, which is the python executable
python_command_parts = proc.cmdline()[1:]
# the initial command part is the last part of the path
python_command_parts[0] = python_command_parts[0].split("/")[-1]
# combine the remaining arguments
command = " ".join(python_command_parts)
print(
f"Evaluating process with name '{proc_name}' and command '{command}'..."
)
if (
len(cmd_line) > 1
and "nox" not in command # don't kill the test runner process
and _is_process_match(command, process_names)
):
print(
f"Killing process with name '{proc_name}' and command '{command}'..."
)
proc.kill()
except psutil.ZombieProcess as zp:
print(
f"Failed to kill zombie process {_print_process_identifier(proc_name, cmd_line, process_names)}: {str(zp)}"
)
except psutil.NoSuchProcess as nsp:
print(
f"Failed to kill process {_print_process_identifier(proc_name, cmd_line, process_names)}: {str(nsp)}"
)
把它们放在一起:
# test_app_runner.py
from pathlib import Path
from app_runner.app_runner import AppRunner
def main():
print("Starting AppRunner test...")
app = "test_app.py"
app_location = Path(__file__).parent / "app"
with AppRunner(app=app, cwd=app_location) as app_runner:
print("Test app start detected.")
print("AppRunner test complete.")
if __name__ == "__main__":
main()