Python 中的组合非阻塞网络摄像头和麦克风捕获

问题描述 投票:0回答:0

我想同步捕获网络摄像头视频和麦克风音频到磁盘,同时对每个视频帧进行实时影响识别和注释(通过 FER python 库)。影响分析代码已经写好并且可以运行,所以它不在这个问题的范围内。

我偶然发现了一个优雅的结构(见代码),我可以在其中插入我的分析代码,但该结构使用 alsaaudio,它仅适用于 Linux,并且不会写入文件。

我将如何重写 PyAudio 并使用 ffmpeg 写入磁盘?

# https://gist.github.com/benhoyle/787bddf71f4c36a4b05a9746ea0885f6
# Building Audio/Video Capture Objects
# The best way to proceed is to have a module for each modality but with a common interface.
import threading

class SensorSource:
    """Abstract object for a sensory modality."""

    def __init__(self):
        """Initialise object."""
        pass

    def start(self):
        """Start capture source."""
        if self.started:
            print('[!] Asynchronous capturing has already been started.')
            return None
        self.started = True
        self.thread = threading.Thread(
            target=self.update,
            args=()
        )
        self.thread.start()
        return self

    def update(self):
        """Update data."""
        pass

    def read(self):
        """Read data."""
        pass

    def stop(self):
        """Stop daemon."""
        self.started = False
        self.thread.join()


# Video source
import cv2
class VideoSource(SensorSource):
    """Object for video using OpenCV."""

    def __init__(self, src=0):
        """Initialise video capture."""
        # width=640, height=480
        self.src = src
        self.cap = cv2.VideoCapture(self.src)
        # self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
        # self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
        self.grabbed, self.frame = self.cap.read()
        self.started = False
        self.read_lock = threading.Lock()

    def update(self):
        """Update based on new video data."""
        while self.started:
            grabbed, frame = self.cap.read()
            with self.read_lock:
                self.grabbed = grabbed
                self.frame = frame

    def read(self):
        """Read video."""
        with self.read_lock:
            frame = self.frame.copy()
            grabbed = self.grabbed
        return grabbed, frame

    def __exit__(self, exec_type, exc_value, traceback):
        self.cap.release()


# Audio source
import struct
from collections import deque
import numpy as np
import logging
import alsaaudio

class AudioSource(SensorSource):
    """Object for audio using alsaaudio."""

    def __init__(self, sample_freq=44100, nb_samples=65536):
        """Initialise audio capture."""
        # Initialise audio
        self.inp = alsaaudio.PCM(
            alsaaudio.PCM_CAPTURE,
            alsaaudio.PCM_NORMAL,
            device="default"
        )
        # set attributes: Mono, frequency, 16 bit little endian samples
        self.inp.setchannels(1)
        self.inp.setrate(sample_freq)
        self.inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
        self.inp.setperiodsize(512)
        self.read_lock = threading.Lock()
        # Create a FIFO structure for the data
        self._s_fifo = deque([0] * nb_samples, maxlen=nb_samples)
        self.l = 0
        self.started = False
        self.read_lock = threading.Lock()

    def update(self):
        """Update based on new audio data."""
        while self.started:
            self.l, data = self.inp.read()
            if self.l > 0:
                # extract and format sample
                raw_smp_l = struct.unpack('h' * self.l, data)
                with self.read_lock:
                    self._s_fifo.extend(raw_smp_l)
            else:
                logging.error(
                    f'Sampler error occur (l={self.l} and len data={len(data)})'
                )

    def read(self):
        """Read audio."""
        with self.read_lock:
            return self.l, np.asarray(self._s_fifo, dtype=np.int16)


class CombinedSource:
    """Object to combine multiple modalities."""

    def __init__(self):
        """Initialise."""
        self.sources = dict()

    def add_source(self, source, name=None):
        """Add a source object.

        source is a derived class from SensorSource
        name is an optional string name."""
        if not name:
            name = source.__class__.__name__
        self.sources[name] = source

    def start(self):
        """Start all sources."""
        for name, source in self.sources.items():
            source.start()

    def read(self):
        """Read from all sources.

        return as dict of tuples."""
        data = dict()
        for name, source in self.sources.items():
            data[name] = source.read()[1]
        return data

    def stop(self):
        """Stop all sources."""
        for name, source in self.sources.items():
            source.stop()

    def __del__(self):
        for name, source in self.sources.items():
            if source.__class__.__name__ == "VideoSource":
                source.cap.release()

    def __exit__(self, exec_type, exc_value, traceback):
        for name, source in self.sources.items():
            if source.__class__.__name__ == "VideoSource":
                source.cap.release()


class AVCapture(CombinedSource):
    """Auto populate with audio and video."""

    def __init__(self):
        """Initialise."""
        self.sources = dict()
        a = AudioSource()
        self.add_source(a, "audio")
        v = VideoSource()
        self.add_source(v, "video")

# Testing
# Some routines to test the objects - to be moved into a testing file.

class testing:
    def make_sensor:
        s = SensorSource()
        assert s.__class__.__name__ == "SensorSource"
    def make_video:
        v = VideoSource()
        assert v.__class__.__name__ == "VideoSource"
    def make_audio:
        a = AudioSource()
        assert a.__class__.__name__ == "AudioSource"
    def make_combined:
        c = CombinedSource()
        assert c.__class__.__name__ == "CombinedSource"

    def test_capture:
        import matplotlib.pyplot as plt
        v.start()
        assert v.cap.isOpened()
        g, d = v.read()
        assert g
        assert d.shape == (480, 640, 3)
        plt.imshow(d[:, :, 0])

    def test_combined_object:
        c.add_source(a, "audio")
        c.add_source(v, "video")
        data = c.read()
        plt.plot(data["audio"])
        plt.imshow(data["video"][:, :, 0])
        c.stop()
        del c

    # Making It Easy
    def full_test:
        av = AVCapture()
        assert av.__class__.__name__ == "AVCapture"
        av.start()
        av.sources.items()
        source_names = [name for name, _ in av.sources.items()]
        assert "audio" in source_names
        assert "video" in source_names
        data = av.read()
        plt.imshow(data["video"][:, :, 0])
        plt.plot(data["audio"])
python audio video webcam video-capture
© www.soinside.com 2019 - 2024. All rights reserved.