通过 RabbitMQ 进行 H.264 视频流的发布和订阅速率问题

问题描述 投票:0回答:1

我正在开发一个项目,使用 RabbitMQ(AMQP 协议)传输 H.264 视频文件并将其显示在 Web 应用程序中。该设置包括捕获视频帧、对其进行编码、将其发送到 RabbitMQ,然后使用 Flask 和 Flask-SocketIO 在 Web 应用程序端使用和解码它们。

但是,我在 RabbitMQ 中的发布和订阅速率方面遇到了性能问题。我似乎无法达到每秒超过 10 条消息。这不足以实现流畅的视频流。 我需要帮助来诊断和解决这些性能瓶颈。

这是我的代码:

  • 视频采集和发布脚本:
# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'

# Configure logging
logging.basicConfig(level=logging.INFO)

@contextmanager
def rabbitmq_channel(host):
    """Context manager to handle RabbitMQ channel setup and teardown."""
    connection = pika.BlockingConnection(pika.ConnectionParameters(host))
    channel = connection.channel()
    try:
        yield channel
    finally:
        connection.close()

def initialize_rabbitmq(channel):
    """Initialize RabbitMQ exchange and queue, and bind them together."""
    channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
    channel.queue_declare(queue=QUEUE_NAME)
    channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)

def send_frame(channel, frame):
    """Encode the video frame using FFmpeg and send it to RabbitMQ."""
    ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
    cmd = [
        ffmpeg_path,
        '-f', 'rawvideo',
        '-pix_fmt', 'rgb24',
        '-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
        '-i', 'pipe:0',
        '-f', 'h264',
        '-vcodec', 'libx264',
        '-pix_fmt', 'yuv420p',
        '-preset', 'ultrafast',
        'pipe:1'
    ]
    
    start_time = time.time()
    process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = process.communicate(input=frame.tobytes())
    encoding_time = time.time() - start_time
    
    if process.returncode != 0:
        logging.error("ffmpeg error: %s", err.decode())
        raise RuntimeError("ffmpeg error")
    
    frame_size = len(out)
    logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
    timestamp = time.time()
    formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
    logging.info(f"Timestamp: {timestamp}") 
    logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
    timestamp_bytes = struct.pack('d', timestamp)
    message_body = timestamp_bytes + out
    channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
    logging.info(f"Encoding time: {encoding_time:.4f} seconds")

def capture_video(channel):
    """Read video from the file, encode frames, and send them to RabbitMQ."""
    if not os.path.exists(VIDEO_FILE_PATH):
        logging.error("Error: Video file does not exist.")
        return
    cap = cv2.VideoCapture(VIDEO_FILE_PATH)
    if not cap.isOpened():
        logging.error("Error: Could not open video file.")
        return
    try:
        while True:
            start_time = time.time()
            ret, frame = cap.read()
            read_time = time.time() - start_time
            if not ret:
                break
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
            send_frame(channel, frame_rgb)
            cv2.imshow('Video', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            logging.info(f"Read time: {read_time:.4f} seconds")
    finally:
        cap.release()
        cv2.destroyAllWindows()
  • 后端(烧瓶):
app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")

RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

def initialize_rabbitmq():
    connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
    channel = connection.channel()
    channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
    channel.queue_declare(queue=QUEUE_NAME)
    channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
    return connection, channel

def decode_frame(frame_data):
    # FFmpeg command to decode H.264 frame data
    ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
    cmd = [
        ffmpeg_path,
        '-f', 'h264',
        '-i', 'pipe:0',
        '-pix_fmt', 'bgr24',
        '-vcodec', 'rawvideo',
        '-an', '-sn',
        '-f', 'rawvideo',
        'pipe:1'
    ]
    process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    start_time = time.time()  # Start timing the decoding process
    out, err = process.communicate(input=frame_data)
    decoding_time = time.time() - start_time  # Calculate decoding time
    
    if process.returncode != 0:
        print("ffmpeg error: ", err.decode())
        return None
    frame_size = (960, 1280, 3)  # frame dimensions expected by the frontend
    frame = np.frombuffer(out, np.uint8).reshape(frame_size)
    print(f"Decoding time: {decoding_time:.4f} seconds")
    return frame

def format_timestamp(ts):
    dt = datetime.fromtimestamp(ts)
    return dt.strftime('%H:%M:%S.%f')[:-3]

def rabbitmq_consumer():
    connection, channel = initialize_rabbitmq()
    for method_frame, properties, body in channel.consume(QUEUE_NAME):
        message_receive_time = time.time()  # Time when the message is received

        # Extract the timestamp from the message body
        timestamp_bytes = body[:8]
        frame_data = body[8:]
        publish_timestamp = struct.unpack('d', timestamp_bytes)[0]

        print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
        print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")

        frame = decode_frame(frame_data)
        decode_time = time.time() - message_receive_time  # Calculate decode time

        if frame is not None:
            _, buffer = cv2.imencode('.jpg', frame)
            frame_data = buffer.tobytes()
            socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
            emit_time = time.time()  # Time after emitting the frame

            # Log the time taken to emit the frame and its size
            rtt = emit_time - publish_timestamp  # Calculate RTT from publish to emit
            print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
            print(f"RTT: {rtt:.4f} seconds")
            print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
        channel.basic_ack(method_frame.delivery_tag)

@app.route('/')
def index():
    return render_template('index.html')

@socketio.on('connect')
def handle_connect():
    print('Client connected')

@socketio.on('disconnect')
def handle_disconnect():
    print('Client disconnected')

if __name__ == '__main__':
    consumer_thread = threading.Thread(target=rabbitmq_consumer)
    consumer_thread.daemon = True
    consumer_thread.start()
    socketio.run(app, host='0.0.0.0', port=5000)

如何优化发布和订阅速率以处理每秒更多的消息?

任何帮助或建议将不胜感激!

我尝试使用线程和多处理来同时处理多个帧,并尝试优化帧解码功能以使其更快,但没有成功。

python ffmpeg rabbitmq video-streaming h.264
1个回答
0
投票

首先我对rabbitmq不太了解,但我认为它每秒可以处理超过10条消息。

您有一些设计问题,

  1. 您通过cv2将视频文件读取为rgb并重新编码为h264。该文件已经是 h264 编码的。它只是开销。使用 pyav 按数据包读取文件,这样您在发送时不需要重新编码步骤。

  2. 您对每个帧执行整个 ffmpeg 过程进行解码,就像在编码步骤中一样,使用 pyav 将包作为像 Thingy 一样的流提供给解码器。

接下来,您将删除每帧的单个过程执行。如果您想使用 Procs,请启动一次与 Pipes 一起使用的操作。

但是 pyav 对开发人员更加友好,并且为您提供更多很酷的东西,就像使用管道一样

© www.soinside.com 2019 - 2024. All rights reserved.