在使用 opencv 进行视频捕获期间移动或调整窗口大小时,PyQt6 应用程序崩溃

问题描述 投票:0回答:1

我正在使用 PyQt6 开发一个应用程序,该应用程序使用 OpenCV 从 Decklink 捕获实时视频并将其显示在 QWidget 上。该应用程序在正常情况下运行良好。但是,我遇到一个持续存在的问题:每当我在视频播放期间移动或调整窗口大小时,应用程序就会崩溃。即使在此类用户交互期间尝试了各种方法来处理视频线程挂起和恢复后,也会出现此问题。

我正在寻找建议或解决方案来防止这些崩溃,最好是通过正确管理 PyQt6 和 OpenCV 之间的集成来在强大且响应迅速的 GUI 应用程序中进行视频捕获。下面是演示该问题的简化代码片段,以及有关我的环境和遇到的具体错误的详细信息。

感谢您提供的任何见解或建议。

class VideoThread(QThread):
    change_pixmap_signal = pyqtSignal(QImage)
    is_running = True
    is_paused = False  # Aggiungi un flag di pausa

    def __init__(self, video_input=4, fps=30, resolution=(1920, 1080), parent=None):
        super().__init__(parent)
        self.fps = fps
        self.resolution = resolution
        self.video_input = video_input

    def run(self):
        cap = cv2.VideoCapture(self.video_input)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
        while self.is_running:
            if not self.is_paused:  # Controlla se il thread è in pausa
                ret, cv_img = cap.read()
                if ret:
                    qt_img = self.convert_cv_qt(cv_img)
                    self.change_pixmap_signal.emit(qt_img)
                time.sleep(1 / self.fps)  # Controllo del frame rate
        cap.release()

    def pause(self):
        self.is_paused = True

    def resume(self):
        self.is_paused = False

    def convert_cv_qt(self, cv_img):
        rgb_image = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
        h, w, ch = rgb_image.shape
        bytes_per_line = ch * w
        return QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format.Format_RGB888)

    def stop(self):
        self.is_running = False
        self.quit()
        self.wait()

class CameraApp(QWidget):
    ispaused = False
    pixmap: QPixmap
    def __init__(self):
        super().__init__()
        self.setGeometry(100, 100, 640, 480)
        self.initUI()
        self.pixmap = QPixmap()
        self.thread = VideoThread()
        self.thread.change_pixmap_signal.connect(self.update_image)
        self.thread.start()

    def initUI(self):
        self.layout = QVBoxLayout()

        self.setLayout(self.layout)

    def paintEvent(self, event):
        if not self.ispaused:
            painter = QPainter(self)
            painter.drawPixmap(self.rect(), self.pixmap)

    def update_image(self, qt_img):
        self.pixmap = QPixmap.fromImage(qt_img)
        self.update()

    def event(self, e):
        if e.type() == QEvent.Type.WindowStateChange:
            if self.windowState() & Qt.WindowState.WindowMinimized:
                self.thread.pause()
                self.ispaused = True
            else:
                self.thread.resume()
                self.ispaused = False
        return super().event(e)

    def closeEvent(self, event):
        self.thread.stop()
        super().closeEvent(event)

我尝试在窗口调整大小和移动期间暂停视频捕获,但这种方法没有解决问题。我怀疑问题可能与 PyQt6 GUI 中图像帧的更新方式有关。应用程序因分段错误而崩溃,这表明在这些操作期间内存的管理或访问方式可能存在问题。任何有关如何处理或调试此问题的建议将不胜感激。

python multithreading qt opencv pyqt6
1个回答
0
投票

现代廉价视频采集卡通常使用 FPGA(现场可编程门阵列)来处理工作负载。 FPGA 在硬件中执行许多处理操作,显着减少了计算机 CPU 的负载。这使得采集卡非常高效,并且能够处理高分辨率、高帧速率的视频流。

提供的代码通过使用 numpy 和 PyQt6 以 60fps 生成和显示随机噪声图像来模拟采集卡的工作负载。 numpy 可以有效地处理图像,而 PyQt6 可以管理实时显示,展示了如何通过足够的计算资源来实现高帧速率。

python

import time
import numpy as np
from PyQt6.QtWidgets import *
from PyQt6.QtCore import *
from PyQt6.QtGui import *


class SynchObject(QObject):
    """
    A class to emit synchronization signals at a specified FPS.
    """
    # Signal to synchronize frame updates
    synch_SIGNAL = pyqtSignal()

    def __init__(self, fps=60, parent=None):
        """
        Initialize the SynchObject with a specified FPS.

        :param fps: Frames per second
        :param parent: Parent QObject
        """
        super().__init__(parent)
        self.fps = fps
        # Timer to emit sync signals at the desired FPS
        self.syncTimer = QTimer(self)
        self.syncTimer.timeout.connect(self.sync)
        self.syncTimer.start(1000 // fps)  # Set timer interval based on FPS
        self._initialized = True

    def sync(self):
        """
        Emit the synchronization signal.
        """
        self.synch_SIGNAL.emit()


class RandomNoiseImageGenerator(QObject):
    """
    A class to generate frames of random noise and update FPS.
    """

    def __init__(self, synchObject, resolution=QSize(1920, 1080)):
        """
        Initialize the RandomNoiseImageGenerator with a synchronization object and resolution.

        :param synchObject: The SynchObject to synchronize frame generation
        :param resolution: Resolution of the generated frames
        """
        super().__init__()
        self._frame = None
        self.frame_count = 0
        self.last_update_time = time.time()
        self.resolution = resolution
        self.synchObject = synchObject
        self.fps = self.synchObject.fps
        self._frame = self.generate_noise()  # Generate initial noise frame
        self.setPayload(True)

    def setPayload(self, _isPayload):
        """
        Simulate a workload based on the computer's CPU if needed.

        :param _isPayload: Flag to simulate workload
        """
        # Connect the sync signal to the capture_frame slot
        self.synchObject.synch_SIGNAL.connect(self.capture_frame)

    def generate_noise(self):
        """
        Generate a frame of random noise.

        :return: A frame of random noise as a numpy array
        """
        height, width = self.resolution.height(), self.resolution.width()
        return np.ascontiguousarray(np.random.randint(0, 256, (height, width, 3), dtype=np.uint8))

    def capture_frame(self):
        """
        Capture a new frame of random noise and update FPS.
        """
        self._frame = self.generate_noise()
        self.update_fps()

    def getFrame(self):
        """
        Get the current frame.

        :return: The current frame or a white frame if no frame is available
        """
        if self._frame is None:
            return np.ones((self.resolution.height(), self.resolution.width(), 3), dtype=np.uint8) * 255
        return self._frame

    def update_fps(self):
        """
        Update the FPS (frames per second) value.
        """
        self.frame_count += 1
        current_time = time.time()
        elapsed_time = current_time - self.last_update_time
        if elapsed_time >= 1.0:  # Update FPS every second
            self.fps = self.frame_count / elapsed_time
            self.frame_count = 0
            self.last_update_time = current_time


class VideoApp(QApplication):
    """
    The main PyQt application to display random noise frames and show FPS.
    """

    def __init__(self, argv):
        """
        Initialize the VideoApp with command line arguments.

        :param argv: Command line arguments
        """
        super().__init__(argv)
        self.synchObject = SynchObject(60)  # Set FPS to 60
        self.input1 = RandomNoiseImageGenerator(self.synchObject)
        self.widget = QWidget()
        self.mainLayout = QVBoxLayout()
        self.viewer = QLabel()
        self.fpsLabel = QLabel()
        self.displayLabel = QLabel()
        self.mainLayout.addWidget(self.viewer)
        self.mainLayout.addWidget(self.fpsLabel)
        self.mainLayout.addWidget(self.displayLabel)
        self.widget.setLayout(self.mainLayout)
        self.widget.show()
        self.viewer.setFixedSize(1920, 1080)  # Set viewer size to full HD
        self.uiTimer = QTimer(self)
        self.uiTimer.timeout.connect(self.display_frame)
        self.uiTimer.start(1000 // 30)  # Update UI at 30 FPS
        QTimer.singleShot(10000, self.stop_app)  # Stop the app after 10 seconds

    def display_frame(self):
        """
        Display the current frame and update FPS and display time.
        """
        frame = self.input1.getFrame()
        if frame is not None and frame.size != 0:
            start_time = time.time()
            # Convert the frame to QImage and display it
            image = QImage(frame.data, frame.shape[1], frame.shape[0], QImage.Format.Format_BGR888)
            self.viewer.setPixmap(QPixmap.fromImage(image))
            display_time = time.time() - start_time
            self.displayLabel.setText(f"Frame displayed in {display_time:.6f} seconds")
            self.fpsLabel.setText(f"FPS: {self.input1.fps:.2f}")

    def stop_app(self):
        """
        Print the average FPS and exit the application.
        """
        print(f"Media FPS: {self.input1.fps:.2f}")
        self.exit()


# Example usage of the VideoApp class

if __name__ == "__main__":
    import sys


    def main():
        app = VideoApp(sys.argv)
        app.exec()


    if __name__ == '__main__':
        import cProfile
        import pstats
        import io

        # Profile the main function to analyze performance
        pr = cProfile.Profile()
        pr.enable()
        main()
        pr.disable()

        # Use pstats to sort and print the profiling results
        s = io.StringIO()
        sortby = 'cumulative'
        ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
        ps.print_stats()
        print(s.getvalue())

# cProfile.Profile() is used to profile the performance of the application. It records the execution time of different functions.
# pstats is used to analyze and sort the profiling results. It helps to identify which parts of the code are consuming the most time.

可以修改代码以使用线程捕获帧。然而,根据我的经验,我发现与直觉相反,最好不要使用线程来实现此目的。您可以比较 pstats 提供的数据来分析性能差异。

此外,您可以执行目视测试来比较结果。在此示例中,我们生成噪声,但如果您从记录秒表的相机捕获并重现帧,目标是确保屏幕上显示的时间与秒表上的时间匹配。

© www.soinside.com 2019 - 2024. All rights reserved.