我想同步捕获网络摄像头视频和麦克风音频到磁盘,同时对每个视频帧进行实时影响识别和注释(通过 FER python 库)。影响分析代码已经写好并且可以运行,所以它不在这个问题的范围内。
我偶然发现了一个优雅的结构(见代码),我可以在其中插入我的分析代码,但该结构使用 alsaaudio,它仅适用于 Linux,并且不会写入文件。
我将如何重写 PyAudio 并使用 ffmpeg 写入磁盘?
# https://gist.github.com/benhoyle/787bddf71f4c36a4b05a9746ea0885f6
# Building Audio/Video Capture Objects
# The best way to proceed is to have a module for each modality but with a common interface.
import threading
class SensorSource:
"""Abstract object for a sensory modality."""
def __init__(self):
"""Initialise object."""
pass
def start(self):
"""Start capture source."""
if self.started:
print('[!] Asynchronous capturing has already been started.')
return None
self.started = True
self.thread = threading.Thread(
target=self.update,
args=()
)
self.thread.start()
return self
def update(self):
"""Update data."""
pass
def read(self):
"""Read data."""
pass
def stop(self):
"""Stop daemon."""
self.started = False
self.thread.join()
# Video source
import cv2
class VideoSource(SensorSource):
"""Object for video using OpenCV."""
def __init__(self, src=0):
"""Initialise video capture."""
# width=640, height=480
self.src = src
self.cap = cv2.VideoCapture(self.src)
# self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
# self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.grabbed, self.frame = self.cap.read()
self.started = False
self.read_lock = threading.Lock()
def update(self):
"""Update based on new video data."""
while self.started:
grabbed, frame = self.cap.read()
with self.read_lock:
self.grabbed = grabbed
self.frame = frame
def read(self):
"""Read video."""
with self.read_lock:
frame = self.frame.copy()
grabbed = self.grabbed
return grabbed, frame
def __exit__(self, exec_type, exc_value, traceback):
self.cap.release()
# Audio source
import struct
from collections import deque
import numpy as np
import logging
import alsaaudio
class AudioSource(SensorSource):
"""Object for audio using alsaaudio."""
def __init__(self, sample_freq=44100, nb_samples=65536):
"""Initialise audio capture."""
# Initialise audio
self.inp = alsaaudio.PCM(
alsaaudio.PCM_CAPTURE,
alsaaudio.PCM_NORMAL,
device="default"
)
# set attributes: Mono, frequency, 16 bit little endian samples
self.inp.setchannels(1)
self.inp.setrate(sample_freq)
self.inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
self.inp.setperiodsize(512)
self.read_lock = threading.Lock()
# Create a FIFO structure for the data
self._s_fifo = deque([0] * nb_samples, maxlen=nb_samples)
self.l = 0
self.started = False
self.read_lock = threading.Lock()
def update(self):
"""Update based on new audio data."""
while self.started:
self.l, data = self.inp.read()
if self.l > 0:
# extract and format sample
raw_smp_l = struct.unpack('h' * self.l, data)
with self.read_lock:
self._s_fifo.extend(raw_smp_l)
else:
logging.error(
f'Sampler error occur (l={self.l} and len data={len(data)})'
)
def read(self):
"""Read audio."""
with self.read_lock:
return self.l, np.asarray(self._s_fifo, dtype=np.int16)
class CombinedSource:
"""Object to combine multiple modalities."""
def __init__(self):
"""Initialise."""
self.sources = dict()
def add_source(self, source, name=None):
"""Add a source object.
source is a derived class from SensorSource
name is an optional string name."""
if not name:
name = source.__class__.__name__
self.sources[name] = source
def start(self):
"""Start all sources."""
for name, source in self.sources.items():
source.start()
def read(self):
"""Read from all sources.
return as dict of tuples."""
data = dict()
for name, source in self.sources.items():
data[name] = source.read()[1]
return data
def stop(self):
"""Stop all sources."""
for name, source in self.sources.items():
source.stop()
def __del__(self):
for name, source in self.sources.items():
if source.__class__.__name__ == "VideoSource":
source.cap.release()
def __exit__(self, exec_type, exc_value, traceback):
for name, source in self.sources.items():
if source.__class__.__name__ == "VideoSource":
source.cap.release()
class AVCapture(CombinedSource):
"""Auto populate with audio and video."""
def __init__(self):
"""Initialise."""
self.sources = dict()
a = AudioSource()
self.add_source(a, "audio")
v = VideoSource()
self.add_source(v, "video")
# Testing
# Some routines to test the objects - to be moved into a testing file.
class testing:
def make_sensor:
s = SensorSource()
assert s.__class__.__name__ == "SensorSource"
def make_video:
v = VideoSource()
assert v.__class__.__name__ == "VideoSource"
def make_audio:
a = AudioSource()
assert a.__class__.__name__ == "AudioSource"
def make_combined:
c = CombinedSource()
assert c.__class__.__name__ == "CombinedSource"
def test_capture:
import matplotlib.pyplot as plt
v.start()
assert v.cap.isOpened()
g, d = v.read()
assert g
assert d.shape == (480, 640, 3)
plt.imshow(d[:, :, 0])
def test_combined_object:
c.add_source(a, "audio")
c.add_source(v, "video")
data = c.read()
plt.plot(data["audio"])
plt.imshow(data["video"][:, :, 0])
c.stop()
del c
# Making It Easy
def full_test:
av = AVCapture()
assert av.__class__.__name__ == "AVCapture"
av.start()
av.sources.items()
source_names = [name for name, _ in av.sources.items()]
assert "audio" in source_names
assert "video" in source_names
data = av.read()
plt.imshow(data["video"][:, :, 0])
plt.plot(data["audio"])