我正在使用 Raspberry Pi 相机进行实时图像处理。为了优化处理时间,我尝试在单独的进程中分离一些图像转换和视频编码:
import time
from multiprocessing import Event as ProcessEvent
from multiprocessing import Process, shared_memory
import cv2
import numpy as np
from picamera2 import MappedArray, Picamera2
from picamera2.request import CompletedRequest
class VideoEncoder(Process):
def __init__(self):
super(VideoEncoder, self).__init__()
self.loop = ProcessEvent()
self.loop.set()
self.new_frame = ProcessEvent()
self.new_frame.clear()
# Doesn't work
self.video_writer = cv2.VideoWriter(
"my_very_long_gstreamer_pipeline",
fourcc=0,
fps=30,
frameSize=(960, 720),
isColor=True,
)
# # Works
# self.video_writer = cv2.VideoWriter(
# "filename.avi",
# cv2.VideoWriter_fourcc(*"MJPG"),
# fps=30,
# frameSize=(960, 720),
# )
data = np.zeros((1232, 1640, 3), dtype=np.uint8).flatten()
self.frame = shared_memory.SharedMemory(create=True, size=data.nbytes)
def run(self):
while self.loop.is_set():
if self.new_frame.wait():
self.process_frame()
self.new_frame.clear()
self.video_writer.release()
def write(self, frame: np.ndarray):
sha = np.ndarray(frame.shape, dtype=frame.dtype, buffer=self.frame.buf)
sha[:] = frame[:]
self.new_frame.set()
print("Requested new frame")
def process_frame(self):
frame = np.ndarray((1232, 1640, 3), dtype=np.uint8, buffer=self.frame.buf)
# Transform image in different ways...
# Draw some stuff on image...
# Takes some time...
frame = cv2.resize(frame, (960, 720))
print("Writing frame to video writer...")
self.video_writer.write(frame)
print(" > Written frame!")
def stop(self):
self.loop.clear()
self.new_frame.set()
encoder = VideoEncoder()
encoder.start()
def cam_callback(request: CompletedRequest):
with MappedArray(request, "main") as m:
# Do some image processing...
# Takes some time...
# Once finished, send file to video stream
encoder.write(m.array)
if __name__ == "__main__":
picam = Picamera2(1)
picam.configure(
picam.create_video_configuration(
main={"format": "RGB888", "size": (1640, 1232)}
)
)
picam.pre_callback = cam_callback
picam.start()
time.sleep(10)
picam.stop()
encoder.stop()
但是,它可以很好地写入视频文件,但在写入 GStreamer 管道时会失败。
这是结果:
self.video_writer.write(frame)
。请注意,它在进程之外运行得很好,我没有收到任何 GStreamer 警告或错误:Requested new frame
Writing frame to video writer...
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
Requested new frame
Requested new frame
Requested new frame
Requested new frame
Requested new frame
...
Requested new frame
Writing frame to video writer...
deprecated pixel format used, make sure you did set range correctly
Requested new frame
Requested new frame
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
> Written frame!
Requested new frame
Writing frame to video writer...
Requested new frame
...
我不太确定如何调试它。任何建议表示赞赏!
我不完全确定为什么,但我必须将
VideoWriter
初始化程序放入 run()
中,以确保它在正确的进程上下文中初始化,就是这样!
def run(self):
self.video_writer = cv2.VideoWriter(
"gstreamer_pipeline",
fourcc=0,
fps=self.input_framerate,
frameSize=(960, 720),
isColor=True,
)
while self.loop.is_set():
if self.new_frame.wait():
self.process_frame()
self.new_frame.clear()
self.video_writer.release()
self.frame.unlink()