如何完全控制一个与flask应用并行运行的进程(startterminate)?

问题描述 投票:0回答:1

这是我的应用架构。

architecture of my application

在我的代码中,有一个 pedestrian.py 文件,该文件使用一个 while 循环从 rtsp 链接中读取帧,并在做完行人检测过程后(可在这个 联系),它将该帧缓存在Redis中。

(请注意,在循环中,每次输出的帧都会被前一次循环输出的帧所替代。这意味着在任何时刻,Redis中只存在一个帧。)

然后在flask应用中,我从redis中读取处理过的frame并发送给客户端。

这是我的行人检测的代码。

from redis import Redis
from concurrent.futures import ThreadPoolExecutor
import cv2
import torch
from os import environ


r = Redis('111.222.333.444')

class RealTimeTracking(object):
    """
    This class is built to get frame from rtsp link and continuously
    save each frame in the output directory. then we use flask to give it
    as service to client.
    Args:
        args: parse_args inputs
        cfg: deepsort dict and yolo-model cfg from server_cfg file

    """

    def __init__(self, cfg, args):
        # Create a VideoCapture object
        self.cfg = cfg
        self.args = args
        use_cuda = self.args.use_cuda and torch.cuda.is_available()

        if not use_cuda:
            raise UserWarning("Running in cpu mode!")

        self.detector = build_detector(cfg, use_cuda=use_cuda)
        self.deepsort = build_tracker(cfg, use_cuda=use_cuda)
        self.class_names = self.detector.class_names

        self.vdo = cv2.VideoCapture(self.args.input)
        self.status, self.frame = None, None
        self.total_frames = int(cv2.VideoCapture.get(self.vdo, cv2.CAP_PROP_FRAME_COUNT))
        self.im_width = int(self.vdo.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.im_height = int(self.vdo.get(cv2.CAP_PROP_FRAME_HEIGHT))

        self.output_frame = None

        self.thread = ThreadPoolExecutor(max_workers=1)
        self.thread.submit(self.update)
        print('streaming started ...')

    def update(self):
        while True:
            if self.vdo.isOpened():
                (self.status, self.frame) = self.vdo.read()

    def run(self):
        while True:
            try:
                if self.status:
                    frame = self.frame.copy()
                    # frame = cv2.resize(frame, (640, 480))
                    self.detection(frame=frame)
                    frame_to_bytes = cv2.imencode('.jpg', frame)[1].tobytes()
                    r.set('frame', frame_to_bytes)
            except AttributeError:
                pass

    def detection(self, frame):
        im = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # do detection
        bbox_xywh, cls_conf, cls_ids = self.detector(im)
        if bbox_xywh is not None:
            # select person class
            mask = cls_ids == 0

            bbox_xywh = bbox_xywh[mask]
            bbox_xywh[:, 3:] *= 1.2  # bbox dilation just in case bbox too small
            cls_conf = cls_conf[mask]

            # do tracking
            outputs = self.deepsort.update(bbox_xywh, cls_conf, im)

            # draw boxes for visualization
            if len(outputs) > 0:
                self.draw_boxes(img=frame, output=outputs)

    @staticmethod
    def draw_boxes(img, output, offset=(0, 0)):
        for i, box in enumerate(output):
            x1, y1, x2, y2, identity = [int(ii) for ii in box]
            x1 += offset[0]
            x2 += offset[0]
            y1 += offset[1]
            y2 += offset[1]

            # box text and bar
            color = compute_color_for_labels(identity)
            label = '{}{:d}'.format("", identity)
            t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0]
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 3)
            cv2.rectangle(img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
            cv2.putText(img, label, (x1, y1 + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 2, [255, 255, 255], 2)
        return img


if __name__ == "__main__":

    args = parse_args() # argument: --rtsp_link = 'rtsp://[email protected]/Channels/105'
    cfg = get_config()
    cfg.merge_from_dict(model)
    cfg.merge_from_dict(deep_sort_dict)
    vdo_trk = RealTimeTracking(cfg, args)
    vdo_trk.run()

这是flask服务器的代码 app.py:

from dotenv import load_dotenv
from time import sleep
from os import getenv
from os.path import join
import subprocess
from flask import Response, Flask

from config.config import DevelopmentConfig
from redis import Redis

r = Redis('111.222.333.444')
app = Flask(__name__)

def gen():
    while True:
        frame = r.get('frame')
        if frame is not None:
            yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'

@app.route('/')
def video_feed():
    """Video streaming route. Put this in the src attribute of an img tag."""
    return Response(gen(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')


if __name__ == '__main__':
    load_dotenv()
    app.config.from_object(DevelopmentConfig)
    cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://[email protected]/Channels/105']
    p = subprocess.Popen(cmd)
    sleep(6)
    app.run()

这段代码在我的系统中运行得很完美。

如你所见,我使用cmd命令在rtsp链接上运行行人检测。之前 运行的flask服务器。

但是,我真正需要的是,这个任务是能够 切换镜头. 我的意思是,当flask服务器正在运行时,我想能够终止这个 pedestrian.py 进程,并在请求到来的任何时刻重新启动 pedestrian.py 有新的 --rtsp_link参数(切换到另一台摄像机)。

这样的事情。

@app.route('/cam1'):
    def cam1():
        stop('pedestrian.py')
        cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://[email protected]/Channels/101']
    p = subprocess.Popen(cmd)

@app.route('/cam2'):
    def cam2():
        stop('pedestrian.py')
        cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://[email protected]/Channels/110']
    p = subprocess.Popen(cmd)

我的烧瓶知识可能不够好。我可能需要使用post方法和验证。

你能告诉我如何在这个代码中实现这样的事情吗?

python opencv flask redis video-streaming
1个回答
0
投票

我找到了一个自动启动和停止行人检测的方法。我的回购:

从os.path import join from os import getenv, environ from dotenv import load_dotenv import argparse from threading import Thread

from redis import Redis
from flask import Response, Flask, jsonify, request, abort

from rtsp_threaded_tracker import RealTimeTracking
from server_cfg import model, deep_sort_dict
from config.config import DevelopmentConfig
from utils.parser import get_config

redis_cache = Redis('127.0.0.1')
app = Flask(__name__)
environ['in_progress'] = 'off'


def parse_args():
    """
    Parses the arguments
    Returns:
        argparse Namespace
    """
    assert 'project_root' in environ.keys()
    project_root = getenv('project_root')
    parser = argparse.ArgumentParser()

    parser.add_argument("--input",
                        type=str,
                        default=getenv('camera_stream'))

    parser.add_argument("--model",
                        type=str,
                        default=join(project_root,
                                     getenv('model_type')))

    parser.add_argument("--cpu",
                        dest="use_cuda",
                        action="store_false", default=True)
    args = parser.parse_args()

    return args


def gen():
    """
    Returns: video frames from redis cache
    """
    while True:
        frame = redis_cache.get('frame')
        if frame is not None:
            yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'


def pedestrian_tracking(cfg, args):
    """
    starts the pedestrian detection on rtsp link
    Args:
        cfg:
        args:
    Returns:
    """
    tracker = RealTimeTracking(cfg, args)
    tracker.run()


def trigger_process(cfg, args):
    """
    triggers pedestrian_tracking process on rtsp link using a thread
    Args:
        cfg:
        args:
    Returns:
    """
    try:
        t = Thread(target=pedestrian_tracking, args=(cfg, args))
        t.start()
        return jsonify({"message": "Pedestrian detection started successfully"})
    except Exception:
        return jsonify({'message': "Unexpected exception occured in process"})


@app.errorhandler(400)
def bad_argument(error):
    return jsonify({'message': error.description['message']})


# Routes
@app.route('/stream', methods=['GET'])
def stream():
    """
    Provides video frames on http link
    Returns:
    """
    return Response(gen(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')


@app.route("/run", methods=['GET'])
def process_manager():
    """
    request parameters:
    run (bool): 1  -> start the pedestrian tracking
                0  -> stop it
    camera_stream: str -> rtsp link to security camera
    :return:
    """
    # data = request.args
    data = request.args
    status = data['run']
    status = int(status) if status.isnumeric() else abort(400, {'message': f"bad argument for run {data['run']}"})
    if status == 1:
        # if pedestrian tracking is not running, start it off!
        try:
            if environ.get('in_progress', 'off') == 'off':
                global cfg, args
                vdo = data.get('camera_stream')
                if vdo is not None:
                    args.input = int(vdo)
                environ['in_progress'] = 'on'
                return trigger_process(cfg, args)
            elif environ.get('in_progress') == 'on':
                # if pedestrian tracking is running, don't start another one (we are short of gpu resources)
                return jsonify({"message": " Pedestrian detection is already in progress."})
        except Exception:
            environ['in_progress'] = 'off'
            return abort(503)
    elif status == 0:
        if environ.get('in_progress', 'off') == 'off':
            return jsonify({"message": "pedestrian detection is already terminated!"})
        else:
            environ['in_progress'] = 'off'
            return jsonify({"message": "Pedestrian detection terminated!"})


if __name__ == '__main__':
    load_dotenv()
    app.config.from_object(DevelopmentConfig)

    # BackProcess Initialization
    args = parse_args()
    cfg = get_config()
    cfg.merge_from_dict(model)
    cfg.merge_from_dict(deep_sort_dict)
    # Start the flask app
    app.run()   
© www.soinside.com 2019 - 2024. All rights reserved.