OpenCV 视频压缩器无法播放视频文件

问题描述 投票:0回答:1

我正在开发一个项目,涉及使用 OpenCV 创建视频压缩器。我已经实现了压缩器来运行不同的测试用例。

import time
import os
import numpy as np
import cv2
import dask
from dask import delayed, compute
import dask.bag as db
import math
import multiprocessing
from multiprocessing.pool import ThreadPool
import asyncio
import logging
import psutil
import matplotlib.pyplot as plt


# Set up logging
logging.basicConfig(filename='video_compression.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')


# Check if a video file is valid
def validate_video_file(file_path):
    if not os.path.exists(file_path):
        logging.error(f"File {file_path} not found.")
        return False
    cap = cv2.VideoCapture(file_path)
    if not cap.isOpened():
        logging.error(f"Unable to open video file {file_path}")
        cap.release()
        return False
    cap.release()
    return True


# Display progress bar in the console
def display_progress_bar(progress, total, bar_length=50):
    percent = int(progress / total * 100)
    bar = '█' * int(bar_length * (progress / total)) + '-' * (bar_length - int(bar_length * (progress / total)))
    print(f"\r\033[91m[{bar}] {percent}%\033[0m", end='')


# Async frame reading utility
async def read_frame(cap):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, cap.read)


# Process frame batches with optional GPU support and other operations
def process_frame_batch(batch, out, **kwargs):
    for frame in batch:
        # GPU and Grayscale processing
        if kwargs.get('grayscale', False):
            if cv2.cuda.getCudaEnabledDeviceCount() > 0:
                frame = cv2.cuda.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            else:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Resize (if specified)
        if 'resolution' in kwargs:
            frame = cv2.resize(frame, kwargs['resolution'])

        # FPS adjustment (if specified)
        if 'fps' in kwargs:
            out.set(cv2.CAP_PROP_FPS, kwargs['fps'])

        out.write(frame)


# Sequential video compression with batch processing and memory-efficient loading
def compress_videos_sequential(input_videos, output_folder, test_name, **kwargs):
    times = []

    for video in input_videos:
        if not validate_video_file(video):
            logging.error(f"Skipping {video} due to validation failure.")
            continue

        start_time = time.time()
        cap = cv2.VideoCapture(video)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Auto-adjust batch size based on available memory
        available_memory = psutil.virtual_memory().available
        batch_size = min(2000, int(available_memory // (total_frames * 480 * 640 * 3)))  # Adjusted batch size

        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        output_path = os.path.join(output_folder, f"compressed_{test_name}_{os.path.basename(video)}.avi")
        out = cv2.VideoWriter(output_path, fourcc, kwargs.get('fps', 20.0), kwargs.get('resolution', (640, 480)))

        frame_batch = []
        frame_count = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_batch.append(frame)
            frame_count += 1

            # Process batch
            if len(frame_batch) == batch_size:
                process_frame_batch(frame_batch, out, **kwargs)
                frame_batch = []

            # Dynamically update progress based on memory usage
            if frame_count % math.ceil(total_frames / 10) == 0:
                memory_usage = psutil.virtual_memory().percent
                cpu_usage = psutil.cpu_percent(interval=0.1)
                disk_usage = psutil.disk_usage(output_folder).percent
                logging.info(f"Memory Usage: {memory_usage}%, CPU Usage: {cpu_usage}%, Disk Usage: {disk_usage}%")
                display_progress_bar(frame_count, total_frames)

        # Process remaining frames in the last batch
        if frame_batch:
            process_frame_batch(frame_batch, out, **kwargs)

        cap.release()
        out.release()
        times.append(time.time() - start_time)

        # Log the time taken for this video
        logging.info(f"Compression time for {video}: {times[-1]:.2f}s")

    return times


# Parallel video compression using Dask with optimized processing
@delayed
def compress_video_dask(video, output_folder, test_name, batch_size=500, **kwargs):
    if not validate_video_file(video):
        logging.error(f"Skipping {video} due to validation failure.")
        return 0

    start_time = time.time()
    cap = cv2.VideoCapture(video)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    output_path = os.path.join(output_folder, f"compressed_{test_name}_{os.path.basename(video)}.avi")
    out = cv2.VideoWriter(output_path, fourcc, kwargs.get('fps', 20.0), kwargs.get('resolution', (640, 480)))

    frame_batch = []
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_batch.append(frame)
        frame_count += 1

        # Process batch
        if len(frame_batch) == batch_size:
            process_frame_batch(frame_batch, out, **kwargs)
            frame_batch = []

        # Throttle progress updates (update every 10% of frames)
        if frame_count % math.ceil(total_frames / 10) == 0:
            display_progress_bar(frame_count, total_frames)

    # Process remaining frames in the last batch
    if frame_batch:
        process_frame_batch(frame_batch, out, **kwargs)

    cap.release()
    out.release()
    return time.time() - start_time


# Optimized Dask parallel compression with dynamic worker pool management
def compress_videos_parallel(input_videos, output_folder, scheduler='threads', **kwargs):
    cpu_count = multiprocessing.cpu_count()
    pool_size = min(len(input_videos), cpu_count)  # Limit pool size to available CPUs or video count

    # Configure Dask to use the specified scheduler
    tasks = []
    for video in input_videos:
        task = compress_video_dask(video, output_folder, **kwargs)
        tasks.append(task)
    times = compute(*tasks, scheduler=scheduler)  # Use specified scheduler
    return times


# Enhanced test runner with benchmarking and logging
def run_tests(test_cases, scheduler='threads'):
    sequential_times = []
    dask_times = []

    for i, test in enumerate(test_cases):
        test_name = f"test_{i + 1}_{test['name']}"  # Unique test name for each case

        print(f"\nRunning Test Case {i + 1}: {test['name']}")
        logging.info(f"Test Case {i + 1}: {test['name']} started.")

        # Initialize timing variables for the test case
        sequential_start_time = time.time()

        # Sequential Compression
        print("\nRunning sequential compression...")
        seq_time = compress_videos_sequential(test['videos'], 'output_sequential', test_name, **test['params'])
        total_seq_time = time.time() - sequential_start_time
        sequential_times.append(total_seq_time)
        print(f"Sequential Compression Time: {total_seq_time:.2f}s")

        # Dask Compression
        print("\nRunning Dask parallel compression...")
        dask_start_time = time.time()  # measure only Dask time
        dask_time = compress_videos_parallel(test['videos'], 'output_dask', scheduler=scheduler, test_name = test_name, **test['params'])
        total_dask_time = time.time() - dask_start_time
        dask_times.append(total_dask_time)
        print(f"Dask Parallel Compression Time: {total_dask_time:.2f}s")

        # Log the total time for the test case
        total_time_test = total_seq_time + total_dask_time
        logging.info(f"Test Case {i + 1}: {test['name']} completed in {total_time_test:.2f}s.")
        print(f"Total Test Time: {total_time_test:.2f}s")

    # Plot side-by-side bar chart
    plt.figure(figsize=(10, 6))
    index = range(len(test_cases))
    bar_width = 0.35

    plt.bar(index, sequential_times, bar_width, label='Sequential')
    plt.bar([i + bar_width for i in index], dask_times, bar_width, label='Dask Parallel')

    plt.xlabel('Test Cases')
    plt.ylabel('Time (seconds)')
    plt.title('Compression Time Comparison: Sequential vs Dask Parallel (Optimized)')
    plt.xticks([i + bar_width / 2 for i in index], [tc['name'] for tc in test_cases], rotation=45, ha='right')
    plt.legend()
    plt.tight_layout()
    plt.savefig('compression_time_comparison.png')
    plt.show()


# Example test cases
test_cases = [
    {"name": "Default Compression", "videos": ["suns.mp4", "suns1.mp4", "suns2.mp4"], "params": {}},
    {"name": "Low Bitrate Compression", "videos": ["suns.mp4", "suns1.mp4", "suns2.mp4"], "params": {"bitrate": "500k"}},
    {"name": "High Bitrate Compression", "videos": ["suns.mp4", "suns1.mp4", "suns2.mp4"], "params": {"bitrate": "3000k"}},
]

if __name__ == "__main__":
    run_tests(test_cases, scheduler='threads')  # can change 'threads' to 'processes' or 'single-threaded' if needed



我尝试过的:

  1. 文件格式:我尝试以各种格式保存输出视频(例如.mp4、.avi),但问题仍然存在。
  2. 编解码器问题:我已经确保所使用的编解码器与格式兼容,但仍然无法工作。例如,我对 .avi 文件使用了 cv2.VideoWriter_fourcc(*'XVID') 。
  3. OpenH264安装:我安装了openh264-1.8.0-win64.dll,希望能够解决编解码器问题,但这也没有解决问题。

我面临一个重大问题:输出文件无法播放。如果您能提供有关如何解决此问题的指导,我将不胜感激。

python opencv parallel-processing pycharm codec
1个回答
-1
投票

你可以使用moviepy代替opencv,但处理速度可能没有opencv快

© www.soinside.com 2019 - 2024. All rights reserved.