我正在开发一个涉及使用 OpenCV 压缩视频的项目。
import time
import os
import numpy as np
import cv2
import dask
from dask import delayed, compute
import dask.bag as db
import math
import multiprocessing
from multiprocessing.pool import ThreadPool
import asyncio
import logging
import psutil
import matplotlib.pyplot as plt
# Set up logging
logging.basicConfig(filename='video_compression.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Check if a video file is valid
def validate_video_file(file_path):
if not os.path.exists(file_path):
logging.error(f"File {file_path} not found.")
return False
cap = cv2.VideoCapture(file_path)
if not cap.isOpened():
logging.error(f"Unable to open video file {file_path}")
return False
return True
# Display progress bar in the console
def display_progress_bar(progress, total, bar_length=50):
percent = int(progress / total * 100)
bar = '█' * int(bar_length * (progress / total)) + '-' * (bar_length - int(bar_length * (progress / total)))
print(f"\r\033[91m[{bar}] {percent}%\033[0m", end='')
# Async frame reading utility
async def read_frame(cap):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, cap.read)
# Process frame batches with optional GPU support and other operations
def process_frame_batch(batch, out, **kwargs):
for frame in batch:
# GPU and Grayscale processing
if kwargs.get('grayscale', False):
if cv2.cuda.getCudaEnabledDeviceCount() > 0:
frame = cv2.cuda.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Resize (if specified)
if 'resolution' in kwargs:
frame = cv2.resize(frame, kwargs['resolution'])
# FPS adjustment (if specified)
if 'fps' in kwargs:
out.set(cv2.CAP_PROP_FPS, kwargs['fps'])
# Sequential video compression with batch processing and memory-efficient loading
def compress_videos_sequential(input_videos, output_folder, test_name, **kwargs):
times = []
for video in input_videos:
if not validate_video_file(video):
logging.error(f"Skipping {video} due to validation failure.")
start_time = time.time()
cap = cv2.VideoCapture(video)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Auto-adjust batch size based on available memory
available_memory = psutil.virtual_memory().available
batch_size = min(2000, int(available_memory // (total_frames * 480 * 640 * 3))) # Adjusted batch size
fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_path = os.path.join(output_folder, f"compressed_{test_name}_{os.path.basename(video)}.avi")
out = cv2.VideoWriter(output_path, fourcc, kwargs.get('fps', 20.0), kwargs.get('resolution', (640, 480)))
frame_batch = []
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
frame_count += 1
# Process batch
if len(frame_batch) == batch_size:
process_frame_batch(frame_batch, out, **kwargs)
frame_batch = []
# Dynamically update progress based on memory usage
if frame_count % math.ceil(total_frames / 10) == 0:
memory_usage = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent(interval=0.1)
disk_usage = psutil.disk_usage(output_folder).percent
logging.info(f"Memory Usage: {memory_usage}%, CPU Usage: {cpu_usage}%, Disk Usage: {disk_usage}%")
display_progress_bar(frame_count, total_frames)
# Process remaining frames in the last batch
if frame_batch:
process_frame_batch(frame_batch, out, **kwargs)
times.append(time.time() - start_time)
# Log the time taken for this video
logging.info(f"Compression time for {video}: {times[-1]:.2f}s")
return times
# Parallel video compression using Dask with optimized processing
def compress_video_dask(video, output_folder, test_name, batch_size=500, **kwargs):
if not validate_video_file(video):
logging.error(f"Skipping {video} due to validation failure.")
return 0
start_time = time.time()
cap = cv2.VideoCapture(video)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_path = os.path.join(output_folder, f"compressed_{test_name}_{os.path.basename(video)}.avi")
out = cv2.VideoWriter(output_path, fourcc, kwargs.get('fps', 20.0), kwargs.get('resolution', (640, 480)))
frame_batch = []
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
frame_count += 1
# Process batch
if len(frame_batch) == batch_size:
process_frame_batch(frame_batch, out, **kwargs)
frame_batch = []
# Throttle progress updates (update every 10% of frames)
if frame_count % math.ceil(total_frames / 10) == 0:
display_progress_bar(frame_count, total_frames)
# Process remaining frames in the last batch
if frame_batch:
process_frame_batch(frame_batch, out, **kwargs)
return time.time() - start_time
# Optimized Dask parallel compression with dynamic worker pool management
def compress_videos_parallel(input_videos, output_folder, scheduler='threads', **kwargs):
cpu_count = multiprocessing.cpu_count()
pool_size = min(len(input_videos), cpu_count) # Limit pool size to available CPUs or video count
# Configure Dask to use the specified scheduler
tasks = []
for video in input_videos:
task = compress_video_dask(video, output_folder, **kwargs)
times = compute(*tasks, scheduler=scheduler) # Use specified scheduler
return times
# Enhanced test runner with benchmarking and logging
def run_tests(test_cases, scheduler='threads'):
sequential_times = []
dask_times = []
for i, test in enumerate(test_cases):
test_name = f"test_{i + 1}_{test['name']}" # Unique test name for each case
print(f"\nRunning Test Case {i + 1}: {test['name']}")
logging.info(f"Test Case {i + 1}: {test['name']} started.")
# Initialize timing variables for the test case
sequential_start_time = time.time()
# Sequential Compression
print("\nRunning sequential compression...")
seq_time = compress_videos_sequential(test['videos'], 'output_sequential', test_name, **test['params'])
total_seq_time = time.time() - sequential_start_time
print(f"Sequential Compression Time: {total_seq_time:.2f}s")
# Dask Compression
print("\nRunning Dask parallel compression...")
dask_start_time = time.time() # measure only Dask time
dask_time = compress_videos_parallel(test['videos'], 'output_dask', scheduler=scheduler, test_name = test_name, **test['params'])
total_dask_time = time.time() - dask_start_time
print(f"Dask Parallel Compression Time: {total_dask_time:.2f}s")
# Log the total time for the test case
total_time_test = total_seq_time + total_dask_time
logging.info(f"Test Case {i + 1}: {test['name']} completed in {total_time_test:.2f}s.")
print(f"Total Test Time: {total_time_test:.2f}s")
# Plot side-by-side bar chart
plt.figure(figsize=(10, 6))
index = range(len(test_cases))
bar_width = 0.35
plt.bar(index, sequential_times, bar_width, label='Sequential')
plt.bar([i + bar_width for i in index], dask_times, bar_width, label='Dask Parallel')
plt.xlabel('Test Cases')
plt.ylabel('Time (seconds)')
plt.title('Compression Time Comparison: Sequential vs Dask Parallel (Optimized)')
plt.xticks([i + bar_width / 2 for i in index], [tc['name'] for tc in test_cases], rotation=45, ha='right')
# Example test cases
test_cases = [
{"name": "Default Compression", "videos": ["suns.mp4", "suns1.mp4", "suns2.mp4"], "params": {}},
{"name": "Low Bitrate Compression", "videos": ["suns.mp4", "suns1.mp4", "suns2.mp4"], "params": {"bitrate": "500k"}},
{"name": "High Bitrate Compression", "videos": ["suns.mp4", "suns1.mp4", "suns2.mp4"], "params": {"bitrate": "3000k"}},
if __name__ == "__main__":
run_tests(test_cases, scheduler='threads') # can change 'threads' to 'processes' or 'single-threaded' if needed