我正在开发一个项目,使用 RabbitMQ(AMQP 协议)传输 H.264 视频文件并将其显示在 Web 应用程序中。该设置包括捕获视频帧、对其进行编码、将其发送到 RabbitMQ,然后使用 Flask 和 Flask-SocketIO 在 Web 应用程序端使用和解码它们。
但是,我在 RabbitMQ 中的发布和订阅速率方面遇到了性能问题。我似乎无法达到每秒超过 10 条消息。这不足以实现流畅的视频流。 我需要帮助来诊断和解决这些性能瓶颈。
这是我的代码:
# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'
# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'
# Configure logging
logging.basicConfig(level=logging.INFO)
@contextmanager
def rabbitmq_channel(host):
"""Context manager to handle RabbitMQ channel setup and teardown."""
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
try:
yield channel
finally:
connection.close()
def initialize_rabbitmq(channel):
"""Initialize RabbitMQ exchange and queue, and bind them together."""
channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)
def send_frame(channel, frame):
"""Encode the video frame using FFmpeg and send it to RabbitMQ."""
ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
cmd = [
ffmpeg_path,
'-f', 'rawvideo',
'-pix_fmt', 'rgb24',
'-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
'-i', 'pipe:0',
'-f', 'h264',
'-vcodec', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'pipe:1'
]
start_time = time.time()
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate(input=frame.tobytes())
encoding_time = time.time() - start_time
if process.returncode != 0:
logging.error("ffmpeg error: %s", err.decode())
raise RuntimeError("ffmpeg error")
frame_size = len(out)
logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
timestamp = time.time()
formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
logging.info(f"Timestamp: {timestamp}")
logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
timestamp_bytes = struct.pack('d', timestamp)
message_body = timestamp_bytes + out
channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
logging.info(f"Encoding time: {encoding_time:.4f} seconds")
def capture_video(channel):
"""Read video from the file, encode frames, and send them to RabbitMQ."""
if not os.path.exists(VIDEO_FILE_PATH):
logging.error("Error: Video file does not exist.")
return
cap = cv2.VideoCapture(VIDEO_FILE_PATH)
if not cap.isOpened():
logging.error("Error: Could not open video file.")
return
try:
while True:
start_time = time.time()
ret, frame = cap.read()
read_time = time.time() - start_time
if not ret:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
send_frame(channel, frame_rgb)
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
logging.info(f"Read time: {read_time:.4f} seconds")
finally:
cap.release()
cv2.destroyAllWindows()
app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'
def initialize_rabbitmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
return connection, channel
def decode_frame(frame_data):
# FFmpeg command to decode H.264 frame data
ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
cmd = [
ffmpeg_path,
'-f', 'h264',
'-i', 'pipe:0',
'-pix_fmt', 'bgr24',
'-vcodec', 'rawvideo',
'-an', '-sn',
'-f', 'rawvideo',
'pipe:1'
]
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start_time = time.time() # Start timing the decoding process
out, err = process.communicate(input=frame_data)
decoding_time = time.time() - start_time # Calculate decoding time
if process.returncode != 0:
print("ffmpeg error: ", err.decode())
return None
frame_size = (960, 1280, 3) # frame dimensions expected by the frontend
frame = np.frombuffer(out, np.uint8).reshape(frame_size)
print(f"Decoding time: {decoding_time:.4f} seconds")
return frame
def format_timestamp(ts):
dt = datetime.fromtimestamp(ts)
return dt.strftime('%H:%M:%S.%f')[:-3]
def rabbitmq_consumer():
connection, channel = initialize_rabbitmq()
for method_frame, properties, body in channel.consume(QUEUE_NAME):
message_receive_time = time.time() # Time when the message is received
# Extract the timestamp from the message body
timestamp_bytes = body[:8]
frame_data = body[8:]
publish_timestamp = struct.unpack('d', timestamp_bytes)[0]
print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")
frame = decode_frame(frame_data)
decode_time = time.time() - message_receive_time # Calculate decode time
if frame is not None:
_, buffer = cv2.imencode('.jpg', frame)
frame_data = buffer.tobytes()
socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
emit_time = time.time() # Time after emitting the frame
# Log the time taken to emit the frame and its size
rtt = emit_time - publish_timestamp # Calculate RTT from publish to emit
print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
print(f"RTT: {rtt:.4f} seconds")
print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
channel.basic_ack(method_frame.delivery_tag)
@app.route('/')
def index():
return render_template('index.html')
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
if __name__ == '__main__':
consumer_thread = threading.Thread(target=rabbitmq_consumer)
consumer_thread.daemon = True
consumer_thread.start()
socketio.run(app, host='0.0.0.0', port=5000)
如何优化发布和订阅速率以处理每秒更多的消息?
任何帮助或建议将不胜感激!
我尝试使用线程和多处理来同时处理多个帧,并尝试优化帧解码功能以使其更快,但没有成功。
首先我对rabbitmq不太了解,但我认为它每秒可以处理超过10条消息。
您有一些设计问题,
您通过cv2将视频文件读取为rgb并重新编码为h264。该文件已经是 h264 编码的。它只是开销。使用 pyav 按数据包读取文件,这样您在发送时不需要重新编码步骤。
您对每个帧执行整个 ffmpeg 过程进行解码,就像在编码步骤中一样,使用 pyav 将包作为像 Thingy 一样的流提供给解码器。
接下来,您将删除每帧的单个过程执行。如果您想使用 Procs,请启动一次与 Pipes 一起使用的操作。
但是 pyav 对开发人员更加友好,并且为您提供更多很酷的东西,就像使用管道一样