我正在使用 PyQt6 开发一个应用程序,该应用程序使用 OpenCV 从 Decklink 捕获实时视频并将其显示在 QWidget 上。该应用程序在正常情况下运行良好。但是,我遇到一个持续存在的问题:每当我在视频播放期间移动或调整窗口大小时,应用程序就会崩溃。即使在此类用户交互期间尝试了各种方法来处理视频线程挂起和恢复后,也会出现此问题。
我正在寻找建议或解决方案来防止这些崩溃,最好是通过正确管理 PyQt6 和 OpenCV 之间的集成来在强大且响应迅速的 GUI 应用程序中进行视频捕获。下面是演示该问题的简化代码片段,以及有关我的环境和遇到的具体错误的详细信息。
感谢您提供的任何见解或建议。
class VideoThread(QThread):
change_pixmap_signal = pyqtSignal(QImage)
is_running = True
is_paused = False # Aggiungi un flag di pausa
def __init__(self, video_input=4, fps=30, resolution=(1920, 1080), parent=None):
super().__init__(parent)
self.fps = fps
self.resolution = resolution
self.video_input = video_input
def run(self):
cap = cv2.VideoCapture(self.video_input)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
while self.is_running:
if not self.is_paused: # Controlla se il thread è in pausa
ret, cv_img = cap.read()
if ret:
qt_img = self.convert_cv_qt(cv_img)
self.change_pixmap_signal.emit(qt_img)
time.sleep(1 / self.fps) # Controllo del frame rate
cap.release()
def pause(self):
self.is_paused = True
def resume(self):
self.is_paused = False
def convert_cv_qt(self, cv_img):
rgb_image = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
bytes_per_line = ch * w
return QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format.Format_RGB888)
def stop(self):
self.is_running = False
self.quit()
self.wait()
class CameraApp(QWidget):
ispaused = False
pixmap: QPixmap
def __init__(self):
super().__init__()
self.setGeometry(100, 100, 640, 480)
self.initUI()
self.pixmap = QPixmap()
self.thread = VideoThread()
self.thread.change_pixmap_signal.connect(self.update_image)
self.thread.start()
def initUI(self):
self.layout = QVBoxLayout()
self.setLayout(self.layout)
def paintEvent(self, event):
if not self.ispaused:
painter = QPainter(self)
painter.drawPixmap(self.rect(), self.pixmap)
def update_image(self, qt_img):
self.pixmap = QPixmap.fromImage(qt_img)
self.update()
def event(self, e):
if e.type() == QEvent.Type.WindowStateChange:
if self.windowState() & Qt.WindowState.WindowMinimized:
self.thread.pause()
self.ispaused = True
else:
self.thread.resume()
self.ispaused = False
return super().event(e)
def closeEvent(self, event):
self.thread.stop()
super().closeEvent(event)
我尝试在窗口调整大小和移动期间暂停视频捕获,但这种方法没有解决问题。我怀疑问题可能与 PyQt6 GUI 中图像帧的更新方式有关。应用程序因分段错误而崩溃,这表明在这些操作期间内存的管理或访问方式可能存在问题。任何有关如何处理或调试此问题的建议将不胜感激。
现代廉价视频采集卡通常使用 FPGA(现场可编程门阵列)来处理工作负载。 FPGA 在硬件中执行许多处理操作,显着减少了计算机 CPU 的负载。这使得采集卡非常高效,并且能够处理高分辨率、高帧速率的视频流。
提供的代码通过使用 numpy 和 PyQt6 以 60fps 生成和显示随机噪声图像来模拟采集卡的工作负载。 numpy 可以有效地处理图像,而 PyQt6 可以管理实时显示,展示了如何通过足够的计算资源来实现高帧速率。
python
import time
import numpy as np
from PyQt6.QtWidgets import *
from PyQt6.QtCore import *
from PyQt6.QtGui import *
class SynchObject(QObject):
"""
A class to emit synchronization signals at a specified FPS.
"""
# Signal to synchronize frame updates
synch_SIGNAL = pyqtSignal()
def __init__(self, fps=60, parent=None):
"""
Initialize the SynchObject with a specified FPS.
:param fps: Frames per second
:param parent: Parent QObject
"""
super().__init__(parent)
self.fps = fps
# Timer to emit sync signals at the desired FPS
self.syncTimer = QTimer(self)
self.syncTimer.timeout.connect(self.sync)
self.syncTimer.start(1000 // fps) # Set timer interval based on FPS
self._initialized = True
def sync(self):
"""
Emit the synchronization signal.
"""
self.synch_SIGNAL.emit()
class RandomNoiseImageGenerator(QObject):
"""
A class to generate frames of random noise and update FPS.
"""
def __init__(self, synchObject, resolution=QSize(1920, 1080)):
"""
Initialize the RandomNoiseImageGenerator with a synchronization object and resolution.
:param synchObject: The SynchObject to synchronize frame generation
:param resolution: Resolution of the generated frames
"""
super().__init__()
self._frame = None
self.frame_count = 0
self.last_update_time = time.time()
self.resolution = resolution
self.synchObject = synchObject
self.fps = self.synchObject.fps
self._frame = self.generate_noise() # Generate initial noise frame
self.setPayload(True)
def setPayload(self, _isPayload):
"""
Simulate a workload based on the computer's CPU if needed.
:param _isPayload: Flag to simulate workload
"""
# Connect the sync signal to the capture_frame slot
self.synchObject.synch_SIGNAL.connect(self.capture_frame)
def generate_noise(self):
"""
Generate a frame of random noise.
:return: A frame of random noise as a numpy array
"""
height, width = self.resolution.height(), self.resolution.width()
return np.ascontiguousarray(np.random.randint(0, 256, (height, width, 3), dtype=np.uint8))
def capture_frame(self):
"""
Capture a new frame of random noise and update FPS.
"""
self._frame = self.generate_noise()
self.update_fps()
def getFrame(self):
"""
Get the current frame.
:return: The current frame or a white frame if no frame is available
"""
if self._frame is None:
return np.ones((self.resolution.height(), self.resolution.width(), 3), dtype=np.uint8) * 255
return self._frame
def update_fps(self):
"""
Update the FPS (frames per second) value.
"""
self.frame_count += 1
current_time = time.time()
elapsed_time = current_time - self.last_update_time
if elapsed_time >= 1.0: # Update FPS every second
self.fps = self.frame_count / elapsed_time
self.frame_count = 0
self.last_update_time = current_time
class VideoApp(QApplication):
"""
The main PyQt application to display random noise frames and show FPS.
"""
def __init__(self, argv):
"""
Initialize the VideoApp with command line arguments.
:param argv: Command line arguments
"""
super().__init__(argv)
self.synchObject = SynchObject(60) # Set FPS to 60
self.input1 = RandomNoiseImageGenerator(self.synchObject)
self.widget = QWidget()
self.mainLayout = QVBoxLayout()
self.viewer = QLabel()
self.fpsLabel = QLabel()
self.displayLabel = QLabel()
self.mainLayout.addWidget(self.viewer)
self.mainLayout.addWidget(self.fpsLabel)
self.mainLayout.addWidget(self.displayLabel)
self.widget.setLayout(self.mainLayout)
self.widget.show()
self.viewer.setFixedSize(1920, 1080) # Set viewer size to full HD
self.uiTimer = QTimer(self)
self.uiTimer.timeout.connect(self.display_frame)
self.uiTimer.start(1000 // 30) # Update UI at 30 FPS
QTimer.singleShot(10000, self.stop_app) # Stop the app after 10 seconds
def display_frame(self):
"""
Display the current frame and update FPS and display time.
"""
frame = self.input1.getFrame()
if frame is not None and frame.size != 0:
start_time = time.time()
# Convert the frame to QImage and display it
image = QImage(frame.data, frame.shape[1], frame.shape[0], QImage.Format.Format_BGR888)
self.viewer.setPixmap(QPixmap.fromImage(image))
display_time = time.time() - start_time
self.displayLabel.setText(f"Frame displayed in {display_time:.6f} seconds")
self.fpsLabel.setText(f"FPS: {self.input1.fps:.2f}")
def stop_app(self):
"""
Print the average FPS and exit the application.
"""
print(f"Media FPS: {self.input1.fps:.2f}")
self.exit()
# Example usage of the VideoApp class
if __name__ == "__main__":
import sys
def main():
app = VideoApp(sys.argv)
app.exec()
if __name__ == '__main__':
import cProfile
import pstats
import io
# Profile the main function to analyze performance
pr = cProfile.Profile()
pr.enable()
main()
pr.disable()
# Use pstats to sort and print the profiling results
s = io.StringIO()
sortby = 'cumulative'
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
# cProfile.Profile() is used to profile the performance of the application. It records the execution time of different functions.
# pstats is used to analyze and sort the profiling results. It helps to identify which parts of the code are consuming the most time.
可以修改代码以使用线程捕获帧。然而,根据我的经验,我发现与直觉相反,最好不要使用线程来实现此目的。您可以比较 pstats 提供的数据来分析性能差异。
此外,您可以执行目视测试来比较结果。在此示例中,我们生成噪声,但如果您从记录秒表的相机捕获并重现帧,目标是确保屏幕上显示的时间与秒表上的时间匹配。