说我们有多个类似于以下的子过程,这些子过程将一些结果实时打印到sys.stdout或sys.stderr。
proc1 = subprocess.Popen(['cmd1'],
env=venv1,
stdout=sys.stdout,
stderr=sys.stderr,
)
proc2 = subprocess.Popen(['cmd2'],
env=venv2,
stdout=sys.stdout,
stderr=sys.stderr,
)
但是,在终端中执行此脚本后,在查看正在打印的内容时,很难区分是哪个打印来自第一个过程,哪个来自第二个过程。
是否有解决方案可以单独查看每个过程的标准输出,例如是否可以对终端屏幕进行分区并且每个分区都可以显示每个过程的打印结果?
我为您编写了一个curses应用程序,它将满足您的要求:将终端窗口划分为多个分区,然后观察不同分区中的不同输出流。
[watch_fd_in_panes
函数将获取列表列表,其中子列表指定在每个分区内要监视的文件描述符。
这是示例调用代码的外观:
import subprocess
from watcher import watch_fds_in_panes
proc1 = subprocess.Popen('for i in `seq 30`; do date; sleep 1 ; done',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# this process also writes something on stderr
proc2 = subprocess.Popen('ls -l /asdf; for i in `seq 20`; do echo $i; sleep 0.5; done',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
proc3 = subprocess.Popen(['echo', 'hello'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
watch_fds_in_panes([[proc1.stdout.fileno(), proc1.stderr.fileno()],
[proc2.stdout.fileno(), proc2.stderr.fileno()],
[proc3.stdout.fileno(), proc3.stderr.fileno()]],
sleep_at_end=3.)
except KeyboardInterrupt:
print("interrupted")
proc1.kill()
proc2.kill()
proc3.kill()
要运行它,您将需要以下两个文件:
panes.py
import curses
class Panes:
"""
curses-based app that divides the screen into a number of scrollable
panes and lets the caller write text into them
"""
def start(self, num_panes):
"set up the panes and initialise the app"
# curses init
self.num = num_panes
self.stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
# split the screen into number of panes stacked vertically,
# drawing some horizontal separator lines
scr_height, scr_width = self.stdscr.getmaxyx()
div_ys = [scr_height * i // self.num for i in range(1, self.num)]
for y in div_ys:
self.stdscr.addstr(y, 0, '-' * scr_width)
self.stdscr.refresh()
# 'boundaries' contains y coords of separator lines including notional
# separator lines above and below everything, and then the panes
# occupy the spaces between these
boundaries = [-1] + div_ys + [scr_height]
self.panes = []
for i in range(self.num):
top = boundaries[i] + 1
bottom = boundaries[i + 1] - 1
height = bottom - top + 1
width = scr_width
# create a scrollable pad for this pane, of height at least
# 'height' (could be more to retain some scrollback history)
pad = curses.newpad(height, width)
pad.scrollok(True)
self.panes.append({'pad': pad,
'coords': [top, 0, bottom, width],
'height': height})
def write(self, pane_num, text):
"write text to the specified pane number (from 0 to num_panes-1)"
pane = self.panes[pane_num]
pad = pane['pad']
y, x = pad.getyx()
pad.addstr(y, x, text)
y, x = pad.getyx()
view_top = max(y - pane['height'], 0)
pad.refresh(view_top, 0, *pane['coords'])
def end(self):
"restore the original terminal behaviour"
curses.nocbreak()
self.stdscr.keypad(0)
curses.echo()
curses.endwin()
和watcher.py
import os
import select
import time
from panes import Panes
def watch_fds_in_panes(fds_by_pane, sleep_at_end=0):
"""
Use panes to watch output from a number of fds that are writing data.
fds_by_pane contains a list of lists of fds to watch in each pane.
"""
panes = Panes()
npane = len(fds_by_pane)
panes.start(npane)
pane_num_for_fd = {}
active_fds = []
data_tmpl = {}
for pane_num, pane_fds in enumerate(fds_by_pane):
for fd in pane_fds:
active_fds.append(fd)
pane_num_for_fd[fd] = pane_num
data_tmpl[fd] = bytes()
try:
while active_fds:
all_data = data_tmpl.copy()
timeout = None
while True:
fds_read, _, _ = select.select(active_fds, [], [], timeout)
timeout = 0
if fds_read:
for fd in fds_read:
data = os.read(fd, 1)
if data:
all_data[fd] += data
else:
active_fds.remove(fd) # saw EOF
else:
# no more data ready to read
break
for fd, data in all_data.items():
if data:
strng = data.decode('utf-8')
panes.write(pane_num_for_fd[fd], strng)
except KeyboardInterrupt:
panes.end()
raise
time.sleep(sleep_at_end)
panes.end()
最后,这是上面运行中的代码的屏幕截图:
在此示例中,我们正在监视相关分区中每个进程的stdout和stderr。在屏幕截图中,proc2在循环开始之前写到stderr的行(关于/asdf
)出现在proc2在循环的第一次迭代中写到stdout的第一行(即1
滚动到分区的顶部),但是由于它们被写入不同的管道而无法控制。