我在 Python 中将 IP 摄像头与 OpenCV 集成,以从实时流中逐帧完成视频处理。我已将相机 FPS 配置为 1 秒,这样我就可以在缓冲区中每秒获取 1 帧进行处理,但我的算法需要 4 秒来处理每一帧,导致缓冲区中未处理的帧停滞,并且随着时间的推移而不断增长 &造成指数级延迟。为了解决这个问题,我创建了另一个线程,在其中调用 cv2.grab() API 来清理缓冲区,它将指针移向每次调用中的最新帧。在主线程中,我调用retrieve()方法,它给我第一个线程捕获的最后一帧。通过这种设计,解决了帧停滞问题并消除了指数延迟,但仍然无法消除 12-13 秒的恒定延迟。我怀疑当调用 cv2.retrieve() 时,它没有获取最新帧,而是获取最新帧中的第四或第五帧。 OpenCV 或任何其他设计模式中是否有任何 API 可以解决此问题,以便我可以获得最新的帧进行处理。
如果您不介意牺牲速度。 你可以创建一个Python生成器来打开相机并返回帧。
def ReadCamera(Camera):
while True:
cap = cv2.VideoCapture(Camera)
(grabbed, frame) = cap.read()
if grabbed == True:
yield frame
现在当您想要处理帧时。
for frame in ReadCamera(Camera):
.....
这工作得很好。除了打开和关闭相机会累加时间。
实现这一点的最佳方法是使用线程, 这是我的代码来做到这一点。
"""
This module contains the Streamer class, which is responsible for streaming the video from the RTSP camera.
Capture the video from the RTSP camera and store it in the queue.
NOTE:
You can preprocess the data before flow from here
"""
import cv2
from queue import Queue
import time
from env import RESOLUTION_X, RESOLUTION_Y,FPS
from threading import Thread
class Streamer:
def __init__(self,rtsp):
"""
Initialize the Streamer object, which is responsible for streaming the video from the RTSP camera.
stream (cv2.VideoCapture): The VideoCapture object.
rtsp (str): The RTSP url.
Q (Queue): The queue to store the frame.
running (bool): The flag to indicate whether the Streamer is running or not.
Args:
rtsp (str): The RTSP url.
"""
print("Creating Streamer object for",rtsp)
self.stream = cv2.VideoCapture(rtsp)
self.rtsp = rtsp
#bufferless VideoCapture
# self.stream.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# self.stream.set(cv2.CAP_PROP_FPS, 10)
self.stream.set(cv2.CAP_PROP_FRAME_WIDTH, RESOLUTION_X)
self.stream.set(cv2.CAP_PROP_FRAME_HEIGHT, RESOLUTION_Y)
self.Q = Queue(maxsize=2)
self.running = True
print("Streamer object created for",rtsp)
def info(self):
"""
Print the information of the Streamer.
"""
print("==============================Stream Info==============================")
print("| Stream:",self.rtsp,"|")
print("| Queue Size:",self.Q.qsize(),"|")
print("| Running:",self.running,"|")
print("======================================================================")
def get_processed_frame(self):
"""
Get the processed frame from the Streamer.
Returns:
dict: The dictionary containing the frame and the time.
"""
if self.Q.empty():
return None
return self.Q.queue[0]
def release(self):
"""
Release the Streamer.
"""
self.stream.release()
def stop(self):
"""
Stop the Streamer.
"""
print("Stopping",self.stream,"Status",self.rtsp)
self.running = False
def start(self):
"""
Start the Streamer.
"""
print("Starting streamer",self.stream, "Status",self.running)
while self.running:
# FOR VIDEO CAPTURE and TESTING FRAME BY FRAME REMOVE THIS COMMENT
# while self.Q.full():
# time.sleep(0.00001)
ret, frame = self.stream.read()
# print(frame,ret)
if not ret:
print("NO Frame for",self.rtsp)
continue
frame =cv2.resize(frame,(RESOLUTION_X,RESOLUTION_Y))
# exit()
if not self.Q.full():
print("Streamer PUT",self.Q.qsize())
self.Q.put({"frame":frame,"time":time.time()})
print("Streamer PUT END",self.Q.qsize())
# exit()
# time.sleep(1/FPS)
self.release()
if __name__ == "__main__":
streamer = Streamer("rtsp://localhost:8554/105")
thread = Thread(target=streamer.start)
thread.start()
while streamer.running:
data = streamer.get_processed_frame()
if data is None:
continue
frame = data["frame"]
cv2.imshow("frame",frame)
cv2.waitKey(1)