使用 moviepy 的 Google Cloud Function 速度变慢并超时

问题描述 投票:0回答:1

我编写了一个程序,它接受 youtubeURL,将脚本解析为剪辑,使用 moviepy 下载原始视频,将其修剪为剪辑,然后将其上传到 Google Storage。我可以确认该功能在本地有效。但是,当我部署并调用该函数时(使用

@https_fn.on_request()
@tasks_fn.on_task_dispatched
),它可以正常工作,但速度显着减慢,尽管将超时设置为 60 分钟,但最终还是超时。

有人能够使用谷歌云功能中的moviepy处理相当大的视频文件吗?

BACKUP_COUNT = 1 # Define this as per your needs
HOURLY_BATCH_SIZE = 5 # Define this as per your needs
BACKUP_START_DATE = datetime(2023, 1, 1) # Define this as per your needs

@https_fn.on_request()
def queue_video_task(req: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""

    request_data = req.get_json()
    youtubeURL = request_data.get("url")
    authorID = request_data.get("authorID")

    logger.info(f"Received request for {youtubeURL}")

    id = uuid.uuid4()

    if not youtubeURL:
        return https_fn.Response('Missing URL', status=400)

    task_queue = functions.task_queue("ingestVideo") #enqueueing ingestVideo()
    target_uri = get_function_url("ingestVideo")

    schedule_time = datetime.now()

    dispatch_deadline_seconds = 60 * 30  # 60 minutes

    doc_ref = firestore.client().collection('processing').document(f"{id}")
    
    doc = doc_ref.get()
    
    if doc.exists:
        if doc._data['processing'] == False: # update document in case another function is dispatched
            return https_fn.Response('Video already processed', status=202)

    data = {
        "url" : youtubeURL,
        "authorID" : authorID,
        "processing" : True
    }
    doc_ref.set(data)

    # backup_date = BACKUP_START_DATE + timedelta(days=i)
    body = {
        "data": {
            "url": youtubeURL,
            "authorID" : authorID,
            "id": f"{id}"
            }
        }

    task_options = functions.TaskOptions(
        schedule_time=schedule_time,
        dispatch_deadline_seconds=dispatch_deadline_seconds,
        uri=target_uri
    )

    logger.info("Updated document")
    logger.info(f"Sent with body: {body}")
    
    task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")

@tasks_fn.on_task_dispatched(
        retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=1200),
        rate_limits=RateLimits(max_concurrent_dispatches=1), 
        memory=MemoryOption.GB_4, 
        timeout_sec=3600, 
        secrets=["OPENAI_KEY"])
def ingestVideo(req: tasks_fn.CallableRequest) -> str:

    youtubeURL = req.data["url"]
    id = req.data["id"]  # Changed to avoid overwriting the original UUID
    authorID = req.data["authorID"]  # Changed to avoid overwriting the original UUID

    logger.info(f"Received body: {req.data}")

    openai_api_key = os.environ.get('OPENAI_KEY')
    client = OpenAI(api_key=openai_api_key)

    logger.info(f"Request: {youtubeURL}")

    if not operation_is_complete(id):
        logger.info(f"Starting")
        subtitles = getSubtitles(youtubeURL)
        logger.info(f"Got Subtitles")
        chunks = chunk_subtitles(subtitles = subtitles)
        logger.info(f"Got Chunks")
        results = analyze_chunks_with_llm(chunks, client)
        logger.info(f"Finished clipping: found {len(results)}")

        # downloaded_video_path = download_youtube_video(link)
        video, audio = download_youtube_video(youtubeURL)
        logger.info("Downloaded")
        combined_buffer_path = combine_video_audio(video, audio, output_path=f"{id}.mp4")
        logger.info("Combined")

    for i in range(len(results)):
        if not operation_is_complete(id):
            clip = results[i]
            id = uuid.uuid4()
            path = f"{id}_clip{i}.mp4"
            logger.info("Uploading Clip")
            trimmed_video_path = trim_video(combined_buffer_path, start_time=timestamp_to_seconds(clip['start']), end_time=timestamp_to_seconds(clip['end']), output_path=f'{path}')
            upload_blob_from_memory(trimmed_video_path, path)
            create_document(id, path, clip, authorID)
            logger.info("Documented Clip")

    db = firestore.client()
    doc_ref = db.collection("processing").document(f"{id}")
    doc = doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        data['processing'] = False
        doc_ref.update(data)
    
    return "Episode Processing Completed"```
python python-3.x google-cloud-functions moviepy
1个回答
0
投票

您的本地计算机可能比运行 Cloud Functions 的默认服务器配置强大得多。 要加速受 CPU 限制的进程,您唯一能实际做的就是为需要运行得更快的功能配置更强大的服务器

如果您已超出配置,则您的工作负载不适合 Cloud Functions,您应该考虑满足您要求的不同选项。 Cloud Functions 并不意味着成为高性能计算作业的产品。 还有许多其他 GCP 产品可用于此目的,例如 Compute Engine(但它们几乎肯定会更难以使用且成本更高)。

© www.soinside.com 2019 - 2024. All rights reserved.