这是我的应用架构。
在我的代码中,有一个 pedestrian.py
文件,该文件使用一个 while 循环从 rtsp 链接中读取帧,并在做完行人检测过程后(可在这个 联系),它将该帧缓存在Redis中。
(请注意,在循环中,每次输出的帧都会被前一次循环输出的帧所替代。这意味着在任何时刻,Redis中只存在一个帧。)
然后在flask应用中,我从redis中读取处理过的frame并发送给客户端。
这是我的行人检测的代码。
from redis import Redis
from concurrent.futures import ThreadPoolExecutor
import cv2
import torch
from os import environ
r = Redis('111.222.333.444')
class RealTimeTracking(object):
"""
This class is built to get frame from rtsp link and continuously
save each frame in the output directory. then we use flask to give it
as service to client.
Args:
args: parse_args inputs
cfg: deepsort dict and yolo-model cfg from server_cfg file
"""
def __init__(self, cfg, args):
# Create a VideoCapture object
self.cfg = cfg
self.args = args
use_cuda = self.args.use_cuda and torch.cuda.is_available()
if not use_cuda:
raise UserWarning("Running in cpu mode!")
self.detector = build_detector(cfg, use_cuda=use_cuda)
self.deepsort = build_tracker(cfg, use_cuda=use_cuda)
self.class_names = self.detector.class_names
self.vdo = cv2.VideoCapture(self.args.input)
self.status, self.frame = None, None
self.total_frames = int(cv2.VideoCapture.get(self.vdo, cv2.CAP_PROP_FRAME_COUNT))
self.im_width = int(self.vdo.get(cv2.CAP_PROP_FRAME_WIDTH))
self.im_height = int(self.vdo.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.output_frame = None
self.thread = ThreadPoolExecutor(max_workers=1)
self.thread.submit(self.update)
print('streaming started ...')
def update(self):
while True:
if self.vdo.isOpened():
(self.status, self.frame) = self.vdo.read()
def run(self):
while True:
try:
if self.status:
frame = self.frame.copy()
# frame = cv2.resize(frame, (640, 480))
self.detection(frame=frame)
frame_to_bytes = cv2.imencode('.jpg', frame)[1].tobytes()
r.set('frame', frame_to_bytes)
except AttributeError:
pass
def detection(self, frame):
im = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# do detection
bbox_xywh, cls_conf, cls_ids = self.detector(im)
if bbox_xywh is not None:
# select person class
mask = cls_ids == 0
bbox_xywh = bbox_xywh[mask]
bbox_xywh[:, 3:] *= 1.2 # bbox dilation just in case bbox too small
cls_conf = cls_conf[mask]
# do tracking
outputs = self.deepsort.update(bbox_xywh, cls_conf, im)
# draw boxes for visualization
if len(outputs) > 0:
self.draw_boxes(img=frame, output=outputs)
@staticmethod
def draw_boxes(img, output, offset=(0, 0)):
for i, box in enumerate(output):
x1, y1, x2, y2, identity = [int(ii) for ii in box]
x1 += offset[0]
x2 += offset[0]
y1 += offset[1]
y2 += offset[1]
# box text and bar
color = compute_color_for_labels(identity)
label = '{}{:d}'.format("", identity)
t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0]
cv2.rectangle(img, (x1, y1), (x2, y2), color, 3)
cv2.rectangle(img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
cv2.putText(img, label, (x1, y1 + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 2, [255, 255, 255], 2)
return img
if __name__ == "__main__":
args = parse_args() # argument: --rtsp_link = 'rtsp://[email protected]/Channels/105'
cfg = get_config()
cfg.merge_from_dict(model)
cfg.merge_from_dict(deep_sort_dict)
vdo_trk = RealTimeTracking(cfg, args)
vdo_trk.run()
这是flask服务器的代码 app.py
:
from dotenv import load_dotenv
from time import sleep
from os import getenv
from os.path import join
import subprocess
from flask import Response, Flask
from config.config import DevelopmentConfig
from redis import Redis
r = Redis('111.222.333.444')
app = Flask(__name__)
def gen():
while True:
frame = r.get('frame')
if frame is not None:
yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
@app.route('/')
def video_feed():
"""Video streaming route. Put this in the src attribute of an img tag."""
return Response(gen(),
mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == '__main__':
load_dotenv()
app.config.from_object(DevelopmentConfig)
cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://[email protected]/Channels/105']
p = subprocess.Popen(cmd)
sleep(6)
app.run()
这段代码在我的系统中运行得很完美。
如你所见,我使用cmd命令在rtsp链接上运行行人检测。之前 运行的flask服务器。
但是,我真正需要的是,这个任务是能够 切换镜头. 我的意思是,当flask服务器正在运行时,我想能够终止这个 pedestrian.py
进程,并在请求到来的任何时刻重新启动 pedestrian.py
有新的 --rtsp_link
参数(切换到另一台摄像机)。
这样的事情。
@app.route('/cam1'):
def cam1():
stop('pedestrian.py')
cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://[email protected]/Channels/101']
p = subprocess.Popen(cmd)
@app.route('/cam2'):
def cam2():
stop('pedestrian.py')
cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://[email protected]/Channels/110']
p = subprocess.Popen(cmd)
我的烧瓶知识可能不够好。我可能需要使用post方法和验证。
你能告诉我如何在这个代码中实现这样的事情吗?
我找到了一个自动启动和停止行人检测的方法。我的回购:
从os.path import join from os import getenv, environ from dotenv import load_dotenv import argparse from threading import Thread
from redis import Redis
from flask import Response, Flask, jsonify, request, abort
from rtsp_threaded_tracker import RealTimeTracking
from server_cfg import model, deep_sort_dict
from config.config import DevelopmentConfig
from utils.parser import get_config
redis_cache = Redis('127.0.0.1')
app = Flask(__name__)
environ['in_progress'] = 'off'
def parse_args():
"""
Parses the arguments
Returns:
argparse Namespace
"""
assert 'project_root' in environ.keys()
project_root = getenv('project_root')
parser = argparse.ArgumentParser()
parser.add_argument("--input",
type=str,
default=getenv('camera_stream'))
parser.add_argument("--model",
type=str,
default=join(project_root,
getenv('model_type')))
parser.add_argument("--cpu",
dest="use_cuda",
action="store_false", default=True)
args = parser.parse_args()
return args
def gen():
"""
Returns: video frames from redis cache
"""
while True:
frame = redis_cache.get('frame')
if frame is not None:
yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
def pedestrian_tracking(cfg, args):
"""
starts the pedestrian detection on rtsp link
Args:
cfg:
args:
Returns:
"""
tracker = RealTimeTracking(cfg, args)
tracker.run()
def trigger_process(cfg, args):
"""
triggers pedestrian_tracking process on rtsp link using a thread
Args:
cfg:
args:
Returns:
"""
try:
t = Thread(target=pedestrian_tracking, args=(cfg, args))
t.start()
return jsonify({"message": "Pedestrian detection started successfully"})
except Exception:
return jsonify({'message': "Unexpected exception occured in process"})
@app.errorhandler(400)
def bad_argument(error):
return jsonify({'message': error.description['message']})
# Routes
@app.route('/stream', methods=['GET'])
def stream():
"""
Provides video frames on http link
Returns:
"""
return Response(gen(),
mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route("/run", methods=['GET'])
def process_manager():
"""
request parameters:
run (bool): 1 -> start the pedestrian tracking
0 -> stop it
camera_stream: str -> rtsp link to security camera
:return:
"""
# data = request.args
data = request.args
status = data['run']
status = int(status) if status.isnumeric() else abort(400, {'message': f"bad argument for run {data['run']}"})
if status == 1:
# if pedestrian tracking is not running, start it off!
try:
if environ.get('in_progress', 'off') == 'off':
global cfg, args
vdo = data.get('camera_stream')
if vdo is not None:
args.input = int(vdo)
environ['in_progress'] = 'on'
return trigger_process(cfg, args)
elif environ.get('in_progress') == 'on':
# if pedestrian tracking is running, don't start another one (we are short of gpu resources)
return jsonify({"message": " Pedestrian detection is already in progress."})
except Exception:
environ['in_progress'] = 'off'
return abort(503)
elif status == 0:
if environ.get('in_progress', 'off') == 'off':
return jsonify({"message": "pedestrian detection is already terminated!"})
else:
environ['in_progress'] = 'off'
return jsonify({"message": "Pedestrian detection terminated!"})
if __name__ == '__main__':
load_dotenv()
app.config.from_object(DevelopmentConfig)
# BackProcess Initialization
args = parse_args()
cfg = get_config()
cfg.merge_from_dict(model)
cfg.merge_from_dict(deep_sort_dict)
# Start the flask app
app.run()