使用 OpevCV VideoWriter 实时录制视频

问题描述 投票:0回答:2

我有一些通过 USB 分析视频的 OpenCV 项目,并且在某些情况下必须将视频流记录到文件中。使用我的软件的人抱怨 10 多分钟的录制产生的视频文件比应有的时间长了大约 20 秒。

我正在使用 openCV 的 VideoWriter。我尝试过将 CV2_CAP_PROP_FPS 设置为非常低的设置,并尝试在几秒钟内获取平均帧速率,以找到适合输出文件帧速率的良好设置。仍然不够接近实时以满足我的需求。

有人知道有什么好方法来确保我的视频录制接近实时吗?我应该使用 time.sleep (在 python 中)之类的东西来限制我的帧速率吗?或者有更好的方法吗?

opencv ffmpeg video-capture video-processing
2个回答
4
投票

我编写了一个实时跟踪器(用于神经生物学研究),我将其与许多网络摄像头一起使用,并注意到我的一些摄像头并没有非常精确地遵循帧速率;它们可能有点快或有点慢。为了保存“相当正确”的视频,从网络摄像头捕获帧的代码调用我的

VideoWriter
write(frame)
方法,该方法将帧放入队列中,并且由写入帧的单独“写入器”线程检索帧以恒定的速度。如果写入线程在写入下一帧时发现队列为空,则会重复上一帧。如果写入器线程在队列中发现多个帧,它将写入最新的帧并丢弃其他帧。 (可以在没有单独线程的情况下实现这一点,但对于实时跟踪器来说,让
write(frame)
快速返回是很好的。)

以下是我的代码的摘录。他们不会逐字运行,而是显示我刚才描述的内容。 (我计划在接下来的几周内将实时跟踪器放在 GitHub 上。) class VideoWriter: def __init__(self, fn=None, fcc='MJPG', fps=7.5): ... self.fcc, self.fps, self.dt = fcc, fps, 1./fps self.q, self._stop, self.n = Queue.Queue(), False, 0 self.wrtr = threading.Thread(target=self._writer) self.wrtr.start() def _writer(self): frm = firstTime = vw = None while True: if self._stop: break # get most recent frame while not self.q.empty(): frm = self.q.get_nowait() if frm is not None: if vw is None: vw = cv2.VideoWriter(self.fn+self._EXT, cvFourcc(self.fcc), self.fps, imgSize(frm), isColor=numChannels(frm)>1) firstTime = time.time() vw.write(frm) self.n += 1 dt = self.dt if firstTime is None else max(0, firstTime + self.n*self.dt - time.time()) time.sleep(dt) # write frame; can be called at rate different from fps def write(self, frm): self.q.put(frm)



0
投票
Ulrich Stern 的代码

,使其适用于我的 OAK D 相机并且几乎不会跳帧。他提到代码不应该立即运行,但我仍然想指出,我们需要 .release()

cv2.VideoWriter
才能获取完整的视频文件。
# https://stackoverflow.com/questions/35370360/recording-video-in-real-time-with-opevcv-videowriter
import threading
import cv2
import queue
import time


class ReasonableVideoWriter:

    def __init__(self, fn=None, fcc='MJPG', fps=30, res=(1280, 720)):
        self.fn = fn
        self.fcc, self.fps, self.dt = fcc, fps, 1./fps
        self.res = res
        self.q, self._stop, self.n = queue.Queue(), False, 0
        self.wrtr = threading.Thread(target=self._writer)
        self.wrtr.setDaemon(True)   # make the thread daemonic
        self.wrtr.start()
    

    def _writer(self):
        frm = firstTime = vw = None
        while True:
            if self._stop:
                self._stop = not self._stop
                break
            # get most recent frame
            while not self.q.empty():
                frm = self.q.get_nowait()

            if frm is not None:
                if vw is None:
                    vw = cv2.VideoWriter(self.fn, cv2.VideoWriter_fourcc(*self.fcc),
                        self.fps, self.res)
                    firstTime = time.time()
                vw.write(frm)
                self.n += 1

            dt = self.dt if firstTime is None else max(0,
                firstTime + self.n*self.dt - time.time())
            time.sleep(dt)

        vw.release()    # release the video file

    # write frame; can be called at rate different from fps
    def write(self, frm):
        self.q.put(frm)
    
    # stop
    def stop(self):
        self._stop = True

创建一个 
ReasonableVideoWriter

对象并使用

.write(frame)
.stop()
方法进行记录。
    

© www.soinside.com 2019 - 2024. All rights reserved.