Python 中子进程读取线超时

问题描述 投票:0回答:8

我有一个小问题,不太确定如何解决。这是一个最小的例子:

我有什么

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    some_criterium = do_something(line)

我想要什么

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    if nothing_happens_after_10s:
        break
    else:
        some_criterium = do_something(line)

我从子进程中读取一行并用它做一些事情。如果在固定时间间隔后没有线路到达,如何退出?

python timeout subprocess
8个回答
35
投票

感谢大家的回答!

我找到了一种方法来解决我的问题,只需使用 select.poll 来查看标准输出。

import select
...
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
poll_obj = select.poll()
poll_obj.register(scan_process.stdout, select.POLLIN)
while(some_criterium and not time_limit):
    poll_result = poll_obj.poll(0)
    if poll_result:
        line = scan_process.stdout.readline()
        some_criterium = do_something(line)
    update(time_limit)

32
投票

这是一个便携式解决方案,使用

asyncio
强制读取单行超时:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

async def run_command(*args, timeout=None):
    # Start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec(*args,
            stdout=PIPE, stderr=STDOUT)

    # Read line (sequence of bytes ending with b'\n') asynchronously
    while True:
        try:
            line = await asyncio.wait_for(process.stdout.readline(), timeout)
        except asyncio.TimeoutError:
            pass
        else:
            if not line: # EOF
                break
            elif do_something(line):
                continue # While some criterium is satisfied
        process.kill() # Timeout or some criterion is not satisfied
        break
    return await process.wait() # Wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop() # For subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

returncode = loop.run_until_complete(run_command("cmd", "arg 1", "arg 2",
                                                 timeout=10))
loop.close()

14
投票

我在 Python 中使用了一些更通用的东西(如果我没记错的话,也是从 Stack Overflow 问题中拼凑出来的,但我不记得是哪些)。

import thread
from threading import Timer

def run_with_timeout(timeout, default, f, *args, **kwargs):
    if not timeout:
        return f(*args, **kwargs)
    try:
        timeout_timer = Timer(timeout, thread.interrupt_main)
        timeout_timer.start()
        result = f(*args, **kwargs)
        return result
    except KeyboardInterrupt:
        return default
    finally:
        timeout_timer.cancel()

不过请注意。这使用中断来停止您赋予它的任何功能。对于所有功能来说,这可能不是一个好主意,它还会阻止您在超时期间使用 Ctrl + C 关闭程序(即 Ctrl + C 将被视为超时)。

你可以使用它并这样称呼它:

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = run_with_timeout(timeout, None, scan_process.stdout.readline)
    if line is None:
        break
    else:
        some_criterium = do_something(line)

不过,这可能有点过分了。我怀疑您的情况有一个我不知道的更简单的选择。


9
投票

虽然 Tom 的解决方案有效,但在

C
习语中使用 select() 更紧凑,这相当于您的答案:

from select import select
scan_process = subprocess.Popen(command,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT,
                                bufsize=1)  # Line buffered
while some_criterium and not time_limit:
    poll_result = select([scan_process.stdout], [], [], time_limit)[0]

其余都一样。

参见

pydoc select.select

[注意:这是 Unix 特定的,其他一些答案也是如此。]

[注 2:根据 OP 请求进行编辑以添加行缓冲]

[注3:行缓冲可能并非在所有情况下都可靠,导致 readline() 阻塞]


5
投票

一个可移植的解决方案是,如果读取一行花费太长时间,则使用线程来终止子进程:

#!/usr/bin/env python3
from subprocess import Popen, PIPE, STDOUT

timeout = 10
with Popen(command, stdout=PIPE, stderr=STDOUT,
           universal_newlines=True) as process:  # text mode
    # kill process in timeout seconds unless the timer is restarted
    watchdog = WatchdogTimer(timeout, callback=process.kill, daemon=True)
    watchdog.start()
    for line in process.stdout:
        # don't invoke the watcthdog callback if do_something() takes too long
        with watchdog.blocked:
            if not do_something(line):  # some criterium is not satisfied
                process.kill()
                break
            watchdog.restart()  # restart timer just before reading the next line
    watchdog.cancel()

其中

WatchdogTimer
类类似于
threading.Timer
可以重新启动和/或阻止:

from threading import Event, Lock, Thread
from subprocess import Popen, PIPE, STDOUT
from time import monotonic  # use time.time or monotonic.monotonic on Python 2

class WatchdogTimer(Thread):
    """Run *callback* in *timeout* seconds unless the timer is restarted."""

    def __init__(self, timeout, callback, *args, timer=monotonic, **kwargs):
        super().__init__(**kwargs)
        self.timeout = timeout
        self.callback = callback
        self.args = args
        self.timer = timer
        self.cancelled = Event()
        self.blocked = Lock()

    def run(self):
        self.restart() # don't start timer until `.start()` is called
        # wait until timeout happens or the timer is canceled
        while not self.cancelled.wait(self.deadline - self.timer()):
            # don't test the timeout while something else holds the lock
            # allow the timer to be restarted while blocked
            with self.blocked:
                if self.deadline <= self.timer() and not self.cancelled.is_set():
                    return self.callback(*self.args)  # on timeout

    def restart(self):
        """Restart the watchdog timer."""
        self.deadline = self.timer() + self.timeout

    def cancel(self):
        self.cancelled.set()

5
投票

尝试使用 signal.alarm:

#timeout.py
import signal, sys

def timeout(sig, frm):
  print "This is taking too long..."
  sys.exit(1)

signal.signal(signal.SIGALRM, timeout)
signal.alarm(10)
byte = 0

while 'IT' not in open('/dev/urandom').read(2):
  byte += 2
print "I got IT in %s byte(s)!" % byte

运行几次来证明它有效:

$ python timeout.py 
This is taking too long...
$ python timeout.py 
I got IT in 4672 byte(s)!

有关更详细的示例,请参阅pGuides


1
投票

使用线程

import subprocess, threading, time

def _watcher(proc, delay):
    time.sleep(delay)
    proc.kill()

try:

    scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    threading.Thread(target = _watcher, args = (scan_process, 10)).start()
    
    while(some_criterium):
        line = scan_process.stdout.readline()
        if nothing_happens_after_10s:
            break
        else:
            some_criterium = do_something(line)

except Exception as e:
    print(e)

另请参阅 如何在超时的情况下运行进程并在运行时仍然获得标准输出


0
投票
import asyncio

async def read_stdout(process):
    # Read from the stdout pipe
    while True:
        line = await process.stdout.readline()
        if not line:
            break
        yield line.decode().strip()


async def main():
    # Create a subprocess
    process = await asyncio.create_subprocess_exec('ls', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)

    # Set the timeout in seconds
    timeout = 10

    try:
        while True:
            line = await asyncio.wait_for(read_stdout(process).__anext__(), timeout=timeout)
            if not line:
                break
            print(line)
    except asyncio.TimeoutError:
        # If no data is available within the timeout, handle it
        print("Timeout occurred")
    finally:
        # Ensure the subprocess is terminated if necessary
        if process.returncode is None:
            process.terminate()
            await process.wait()

asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.