如何从 Google 云存储分块传输音频并将每个块转换为 WAV 以进行 Whisper 转录

问题描述 投票:0回答:1

我正在开发一个项目,需要使用 OpenAI 的 Whisper 模型转录存储在 Google Cloud Storage 存储桶中的音频。音频以 Opus 编码的 WebM 格式存储,由于文件大小,我以 30 秒的块传输音频。

为了将每个块转换为与 Whisper 兼容的 WAV(16 kHz、单声道、16 位 PCM),我使用 FFmpeg。第一个块转换成功,但后续块转换失败。我怀疑这是因为每个块缺少 WebM 容器的标头,FFmpeg 需要正确解释 Opus 编解码器。

这是我的方法的简化版本:

下载块:我从 GCS 下载每个块作为字节。 使用 FFmpeg 转换:我将字节传递给 FFmpeg,以将每个块从 WebM/Opus 转换为 WAV。

async def handle_transcription_and_notify(
    consultation_service: ConsultationService,
    consultation_processor: ConsultationProcessor,
    consultation: Consultation,
    language: str,
    notes: str,
    clinic_id: str,
    vet_email: str,
    trace_id: str,
    blob_path: str,
    max_retries: int = 3,
    retry_delay: int = 5,
    max_concurrent_tasks: int = 3
):
    """
    Handles the transcription process by streaming the file from GCS, converting to a compatible format, 
    and notifying the client via WebSocket.
    """
    chunk_duration_sec = 30  # 30 seconds per chunk
    logger.info(f"Starting transcription process for consultation {consultation.consultation_id}",
                extra={'trace_id': trace_id})

    # Initialize GCS client
    service_account_key = os.environ.get('SERVICE_ACCOUNT_KEY_BACKEND')
    if not service_account_key:
        logger.error("Service account key not found in environment variables", extra={'trace_id': trace_id})
        await send_discord_alert(
            f"Service account key not found for consultation {consultation.consultation_id}.\nTrace ID: {trace_id}"
        )
        return

    try:
        service_account_info = json.loads(service_account_key)
        credentials = service_account.Credentials.from_service_account_info(service_account_info)
    except Exception as e:
        logger.error(f"Error loading service account credentials: {str(e)}", extra={'trace_id': trace_id})
        await send_discord_alert(
            f"Error loading service account credentials for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
        )
        return

    # Initialize GCS client
    service_account_key = os.environ.get('SERVICE_ACCOUNT_KEY_BACKEND')
    if not service_account_key:
        logger.error("Service account key not found in environment variables", extra={'trace_id': trace_id})
        await send_discord_alert(
            f"Service account key not found for consultation {consultation.consultation_id}.\nTrace ID: {trace_id}"
        )
        return

    try:
        service_account_info = json.loads(service_account_key)
        credentials = service_account.Credentials.from_service_account_info(service_account_info)
    except Exception as e:
        logger.error(f"Error loading service account credentials: {str(e)}", extra={'trace_id': trace_id})
        await send_discord_alert(
            f"Error loading service account credentials for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
        )
        return

    storage_client = storage.Client(credentials=credentials)
    bucket_name = 'vetz_consultations'
    blob = storage_client.bucket(bucket_name).get_blob(blob_path)
    bytes_per_second = 16000 * 2  # 32,000 bytes per second
    chunk_size_bytes = 30 * bytes_per_second
    size = blob.size

    async def stream_blob_in_chunks(blob, chunk_size):
        loop = asyncio.get_running_loop()
        start = 0
        size = blob.size
        while start < size:
            end = min(start + chunk_size - 1, size - 1)
            try:
                logger.info(f"Requesting chunk from {start} to {end}", extra={'trace_id': trace_id})
                chunk = await loop.run_in_executor(
                    None, lambda: blob.download_as_bytes(start=start, end=end)
                )
                if not chunk:
                    break
                logger.info(f"Yielding chunk from {start} to {end}, size: {len(chunk)} bytes",
                            extra={'trace_id': trace_id})
                yield chunk
                start += chunk_size
            except Exception as e:
                logger.error(f"Error downloading chunk from {start} to {end}: {str(e)}", exc_info=True,
                             extra={'trace_id': trace_id})
                raise e

    async def convert_to_wav(chunk_bytes, chunk_idx):
        """
        Convert audio chunk to WAV format compatible with Whisper, ensuring it's 16 kHz, mono, and 16-bit PCM.
        """
        try:
            logger.debug(f"Processing chunk {chunk_idx}: size = {len(chunk_bytes)} bytes")

            detected_format = await detect_audio_format(chunk_bytes)
            logger.info(f"Detected audio format for chunk {chunk_idx}: {detected_format}")
            input_io = io.BytesIO(chunk_bytes)
            output_io = io.BytesIO()

            # ffmpeg command to convert webm/opus to WAV with 16 kHz, mono, and 16-bit PCM

            # ffmpeg command with debug information
            ffmpeg_command = [
                "ffmpeg",
                "-loglevel", "debug",
                "-f", "s16le",            # Treat input as raw PCM data
                "-ar", "48000",           # Set input sample rate
                "-ac", "1",               # Set input to mono
                "-i", "pipe:0",
                "-ar", "16000",           # Set output sample rate to 16 kHz
                "-ac", "1",               # Ensure mono output
                "-sample_fmt", "s16",     # Set output format to 16-bit PCM
                "-f", "wav",              # Output as WAV format
                "pipe:1"
            ]

            process = subprocess.Popen(
                ffmpeg_command,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )

            stdout, stderr = process.communicate(input=input_io.read())

            if process.returncode == 0:
                logger.info(f"FFmpeg conversion completed successfully for chunk {chunk_idx}")
                output_io.write(stdout)
                output_io.seek(0)

                # Save the WAV file locally for listening
                output_dir = "converted_chunks"
                os.makedirs(output_dir, exist_ok=True)
                file_path = os.path.join(output_dir, f"chunk_{chunk_idx}.wav")

                with open(file_path, "wb") as f:
                    f.write(stdout)
                logger.info(f"Chunk {chunk_idx} saved to {file_path}")

                return output_io
            else:
                logger.error(f"FFmpeg failed for chunk {chunk_idx} with return code {process.returncode}")
                logger.error(f"Chunk {chunk_idx} - FFmpeg stderr: {stderr.decode()}")
                return None

        except Exception as e:
            logger.error(f"Unexpected error in FFmpeg conversion for chunk {chunk_idx}: {str(e)}")
            return None

    async def transcribe_chunk(idx, chunk_bytes):
        for attempt in range(1, max_retries + 1):
            try:
                logger.info(f"Transcribing chunk {idx + 1} (attempt {attempt}).", extra={'trace_id': trace_id})

                # Convert to WAV format
                wav_io = await convert_to_wav(chunk_bytes, idx)
                if not wav_io:
                    logger.error(f"Failed to convert chunk {idx + 1} to WAV format.")
                    return ""

                wav_io.name = "chunk.wav"
                chunk_transcription = await consultation_processor.transcribe_audio_whisper(wav_io)
                logger.info(f"Chunk {idx + 1} transcribed successfully.", extra={'trace_id': trace_id})
                return chunk_transcription
            except Exception as e:
                logger.error(f"Error transcribing chunk {idx + 1} (attempt {attempt}): {str(e)}", exc_info=True,
                             extra={'trace_id': trace_id})
                if attempt < max_retries:
                    await asyncio.sleep(retry_delay)
                else:
                    await send_discord_alert(
                        f"Max retries reached for chunk {idx + 1} in consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
                    )
                    return ""  # Return empty string for failed chunk

    await notification_manager.send_personal_message(
        f"Consultation {consultation.consultation_id} is being transcribed.", vet_email
    )

    try:
        idx = 0
        full_transcription = []
        async for chunk in stream_blob_in_chunks(blob, chunk_size_bytes):
            transcription = await transcribe_chunk(idx, chunk)
            if transcription:
                full_transcription.append(transcription)
            idx += 1

        combined_transcription = " ".join(full_transcription)
        consultation.full_transcript = (consultation.full_transcript or "") + " " + combined_transcription
        consultation_service.save_consultation(clinic_id, vet_email, consultation)
        logger.info(f"Transcription saved for consultation {consultation.consultation_id}.",
                    extra={'trace_id': trace_id})

    except Exception as e:
        logger.error(f"Error during transcription process: {str(e)}", exc_info=True, extra={'trace_id': trace_id})
        await send_discord_alert(
            f"Error during transcription process for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
        )
        return

    await notification_manager.send_personal_message(
        f"Consultation {consultation.consultation_id} has been transcribed.", vet_email
    )

    try:
        template_service = TemplateService()
        medical_record_template = template_service.get_template_by_name(
            consultation.medical_record_template_id).sections

        sections = await consultation_processor.extract_structured_sections(
            transcription=consultation.full_transcript,
            notes=notes,
            language=language,
            template=medical_record_template,
        )
        consultation.sections = sections
        consultation_service.save_consultation(clinic_id, vet_email, consultation)
        logger.info(f"Sections processed for consultation {consultation.consultation_id}.",
                    extra={'trace_id': trace_id})
    except Exception as e:
        logger.error(f"Error processing sections for consultation {consultation.consultation_id}: {str(e)}",
                     exc_info=True, extra={'trace_id': trace_id})
        await send_discord_alert(
            f"Error processing sections for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
        )
        raise e

    await notification_manager.send_personal_message(
        f"Consultation {consultation.consultation_id} is fully processed.", vet_email
    )
    logger.info(f"Successfully processed consultation {consultation.consultation_id}.",
                extra={'trace_id': trace_id})

audio ffmpeg google-cloud-storage ffmpeg-python whisper
1个回答
0
投票

首先,我不明白你为什么要分块地这样做。 从技术角度来看,这没有任何意义,也没有实际意义。 音频中肯定有时您想要转录的单词和短语与任意 30 秒边界重叠?

我认为您将其作为流来完成会更好。

一些高级步骤:

  1. 为您的 GCS 对象签署 URL。
  2. 将该 URL 传递给 FFmpeg (
    ffmpeg -i <url> ...
    ),并让它使用自己的 HTTP 客户端来获取和传输它。 FFmpeg 将根据需要对流施加背压,并在需要时在流通过时对其进行节流。 (不过,这可能不是必需的,我们只是解码 Opus,如今它几乎占用了零 CPU。)
  3. 将 FFmpeg 的输出通过管道传输到转录服务的输入。
© www.soinside.com 2019 - 2024. All rights reserved.