我的目标:每M秒从子进程中读取最新的“块”(N行)流式标准输出。
当前代码:
我想要实现的是等待M秒之后,如果它总是读取latest N行,而不是标准输出中的后续N行(它们可以在我只对最新感兴趣)
我的最终目标是产生一个线程来运行该过程并继续保存最新的行,然后在需要最新的流结果时从主进程中调用。
任何帮助将不胜感激!
#!/usr/bin/env python3
import signal
import time
from subprocess import Popen, PIPE
sig = signal.SIGTERM
N=9
M=5
countlines=0
p = Popen(["myprogram"], stdout=PIPE, bufsize=1, universal_newlines=True)
chunk=[]
for line in p.stdout:
countlines+=1
chunk.append(line)
if len(chunk)==N:
print(chunk)
chunk=[]
time.sleep(M)
if countlines>100:
p.send_signal(sig)
break
print("done")
经过大量搜索,我在这里偶然发现了一个解决方案:
https://eli.thegreenplace.net/2017/interacting-with-a-long-running-child-process-in-python/
Eli的“启动,交互,实时获取输出,终止”代码部分为我工作。到目前为止,这是我找到的最优雅的解决方案。
适应上面的问题,并写在一个类中(此处未显示):
def output_reader(self,proc):
chunk=[]
countlines=0
for line in iter(proc.stdout.readline, b''):
countlines+=1
chunk.append(line.decode("utf-8"))
if countlines==N:
self.current_chunk = chunk
chunk=[]
countlines=0
def main():
proc = subprocess.Popen(['myprocess'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
t = threading.Thread(target=output_reader, args=(proc,))
t.start()
try:
time.sleep(0.2)
for i in range(10):
time.sleep(1) # waits a while before getting latest lines
print(self.current_chunk)
finally:
proc.terminate()
try:
proc.wait(timeout=0.2)
print('== subprocess exited with rc =', proc.returncode)
except subprocess.TimeoutExpired:
print('subprocess did not terminate in time')
t.join()
这里是另一种可能的解决方案。该程序将在管道中作为单独的进程运行,并提供一个REST API,它将返回在stdin上读取的最后N行。它在flask中使用run
,因此,在外部环境可以访问本地服务器端口进行请求的情况下,不应使用它,尽管可以对此进行调整。
import sys
import time
import threading
import argparse
from flask import Flask, request
from flask_restful import Resource, Api
class Server:
def __init__(self):
self.data = {'at_eof': False,
'lines_read': 0,
'latest_lines': []}
self.thread = None
self.args = None
self.stop = False
def parse_args(self):
parser = argparse.ArgumentParser()
parser.add_argument("num_lines", type=int,
help="number of lines to cache")
parser.add_argument("port", type=int,
help="port to serve on")
self.args = parser.parse_args()
def start_updater(self):
def updater():
lines = self.data['latest_lines']
while True:
if self.stop:
return
line = sys.stdin.readline()
if not line:
break
self.data['lines_read'] += 1
lines.append(line)
while len(lines) > self.args.num_lines:
lines.pop(0)
self.data['at_eof'] = True
self.thread = threading.Thread(target=updater)
self.thread.start()
def get_data(self):
return self.data
def shutdown(self):
self.stop = True
func = request.environ.get('werkzeug.server.shutdown')
if func:
func()
return 'Shutting down'
else:
return 'shutdown failed'
def add_apis(self, app):
class GetData(Resource):
get = self.get_data
class Shutdown(Resource):
get = self.shutdown
api = Api(app)
api.add_resource(GetData, "/getdata")
api.add_resource(Shutdown, "/shutdown")
def run(self):
self.parse_args()
self.start_updater()
app = Flask(__name__)
self.add_apis(app)
app.run(port=self.args.port)
server = Server()
server.run()
示例用法:这是我们要提供其输出的测试程序:
import sys
import time
for i in range(100):
print("this is line {}".format(i))
sys.stdout.flush()
time.sleep(.1)
和一个简单的管道来启动它(在Linux shell提示符下,但是可以通过subprocess.Popen
完成,在端口8001上提供最后5行:
python ./writer.py | python ./server.py 5 8001
一个示例查询,这里使用curl作为客户端,但是可以通过Python requests
来完成:
$ curl -s http://localhost:8001/getdata
{"at_eof": false, "lines_read": 30, "latest_lines": ["this is line 25\n", "this is line 26\n", "this is line 27\n", "this is line 28\n", "this is line 29\n"]}
服务器还提供了一个http://localhost:<port>/shutdown
URL来终止它。如果您在第一次看到"at_eof": true
之前就调用它,那么期望编写器因管道破裂而死亡。