我正在开发一个项目,需要使用 OpenAI 的 Whisper 模型转录存储在 Google Cloud Storage 存储桶中的音频。音频以 Opus 编码的 WebM 格式存储,由于文件大小,我以 30 秒的块传输音频。
为了将每个块转换为与 Whisper 兼容的 WAV(16 kHz、单声道、16 位 PCM),我使用 FFmpeg。第一个块转换成功,但后续块转换失败。我怀疑这是因为每个块缺少 WebM 容器的标头,FFmpeg 需要正确解释 Opus 编解码器。
这是我的方法的简化版本:
下载块:我从 GCS 下载每个块作为字节。 使用 FFmpeg 转换:我将字节传递给 FFmpeg,以将每个块从 WebM/Opus 转换为 WAV。
async def handle_transcription_and_notify(
consultation_service: ConsultationService,
consultation_processor: ConsultationProcessor,
consultation: Consultation,
language: str,
notes: str,
clinic_id: str,
vet_email: str,
trace_id: str,
blob_path: str,
max_retries: int = 3,
retry_delay: int = 5,
max_concurrent_tasks: int = 3
):
"""
Handles the transcription process by streaming the file from GCS, converting to a compatible format,
and notifying the client via WebSocket.
"""
chunk_duration_sec = 30 # 30 seconds per chunk
logger.info(f"Starting transcription process for consultation {consultation.consultation_id}",
extra={'trace_id': trace_id})
# Initialize GCS client
service_account_key = os.environ.get('SERVICE_ACCOUNT_KEY_BACKEND')
if not service_account_key:
logger.error("Service account key not found in environment variables", extra={'trace_id': trace_id})
await send_discord_alert(
f"Service account key not found for consultation {consultation.consultation_id}.\nTrace ID: {trace_id}"
)
return
try:
service_account_info = json.loads(service_account_key)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
except Exception as e:
logger.error(f"Error loading service account credentials: {str(e)}", extra={'trace_id': trace_id})
await send_discord_alert(
f"Error loading service account credentials for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
return
# Initialize GCS client
service_account_key = os.environ.get('SERVICE_ACCOUNT_KEY_BACKEND')
if not service_account_key:
logger.error("Service account key not found in environment variables", extra={'trace_id': trace_id})
await send_discord_alert(
f"Service account key not found for consultation {consultation.consultation_id}.\nTrace ID: {trace_id}"
)
return
try:
service_account_info = json.loads(service_account_key)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
except Exception as e:
logger.error(f"Error loading service account credentials: {str(e)}", extra={'trace_id': trace_id})
await send_discord_alert(
f"Error loading service account credentials for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
return
storage_client = storage.Client(credentials=credentials)
bucket_name = 'vetz_consultations'
blob = storage_client.bucket(bucket_name).get_blob(blob_path)
bytes_per_second = 16000 * 2 # 32,000 bytes per second
chunk_size_bytes = 30 * bytes_per_second
size = blob.size
async def stream_blob_in_chunks(blob, chunk_size):
loop = asyncio.get_running_loop()
start = 0
size = blob.size
while start < size:
end = min(start + chunk_size - 1, size - 1)
try:
logger.info(f"Requesting chunk from {start} to {end}", extra={'trace_id': trace_id})
chunk = await loop.run_in_executor(
None, lambda: blob.download_as_bytes(start=start, end=end)
)
if not chunk:
break
logger.info(f"Yielding chunk from {start} to {end}, size: {len(chunk)} bytes",
extra={'trace_id': trace_id})
yield chunk
start += chunk_size
except Exception as e:
logger.error(f"Error downloading chunk from {start} to {end}: {str(e)}", exc_info=True,
extra={'trace_id': trace_id})
raise e
async def convert_to_wav(chunk_bytes, chunk_idx):
"""
Convert audio chunk to WAV format compatible with Whisper, ensuring it's 16 kHz, mono, and 16-bit PCM.
"""
try:
logger.debug(f"Processing chunk {chunk_idx}: size = {len(chunk_bytes)} bytes")
detected_format = await detect_audio_format(chunk_bytes)
logger.info(f"Detected audio format for chunk {chunk_idx}: {detected_format}")
input_io = io.BytesIO(chunk_bytes)
output_io = io.BytesIO()
# ffmpeg command to convert webm/opus to WAV with 16 kHz, mono, and 16-bit PCM
# ffmpeg command with debug information
ffmpeg_command = [
"ffmpeg",
"-loglevel", "debug",
"-f", "s16le", # Treat input as raw PCM data
"-ar", "48000", # Set input sample rate
"-ac", "1", # Set input to mono
"-i", "pipe:0",
"-ar", "16000", # Set output sample rate to 16 kHz
"-ac", "1", # Ensure mono output
"-sample_fmt", "s16", # Set output format to 16-bit PCM
"-f", "wav", # Output as WAV format
"pipe:1"
]
process = subprocess.Popen(
ffmpeg_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate(input=input_io.read())
if process.returncode == 0:
logger.info(f"FFmpeg conversion completed successfully for chunk {chunk_idx}")
output_io.write(stdout)
output_io.seek(0)
# Save the WAV file locally for listening
output_dir = "converted_chunks"
os.makedirs(output_dir, exist_ok=True)
file_path = os.path.join(output_dir, f"chunk_{chunk_idx}.wav")
with open(file_path, "wb") as f:
f.write(stdout)
logger.info(f"Chunk {chunk_idx} saved to {file_path}")
return output_io
else:
logger.error(f"FFmpeg failed for chunk {chunk_idx} with return code {process.returncode}")
logger.error(f"Chunk {chunk_idx} - FFmpeg stderr: {stderr.decode()}")
return None
except Exception as e:
logger.error(f"Unexpected error in FFmpeg conversion for chunk {chunk_idx}: {str(e)}")
return None
async def transcribe_chunk(idx, chunk_bytes):
for attempt in range(1, max_retries + 1):
try:
logger.info(f"Transcribing chunk {idx + 1} (attempt {attempt}).", extra={'trace_id': trace_id})
# Convert to WAV format
wav_io = await convert_to_wav(chunk_bytes, idx)
if not wav_io:
logger.error(f"Failed to convert chunk {idx + 1} to WAV format.")
return ""
wav_io.name = "chunk.wav"
chunk_transcription = await consultation_processor.transcribe_audio_whisper(wav_io)
logger.info(f"Chunk {idx + 1} transcribed successfully.", extra={'trace_id': trace_id})
return chunk_transcription
except Exception as e:
logger.error(f"Error transcribing chunk {idx + 1} (attempt {attempt}): {str(e)}", exc_info=True,
extra={'trace_id': trace_id})
if attempt < max_retries:
await asyncio.sleep(retry_delay)
else:
await send_discord_alert(
f"Max retries reached for chunk {idx + 1} in consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
return "" # Return empty string for failed chunk
await notification_manager.send_personal_message(
f"Consultation {consultation.consultation_id} is being transcribed.", vet_email
)
try:
idx = 0
full_transcription = []
async for chunk in stream_blob_in_chunks(blob, chunk_size_bytes):
transcription = await transcribe_chunk(idx, chunk)
if transcription:
full_transcription.append(transcription)
idx += 1
combined_transcription = " ".join(full_transcription)
consultation.full_transcript = (consultation.full_transcript or "") + " " + combined_transcription
consultation_service.save_consultation(clinic_id, vet_email, consultation)
logger.info(f"Transcription saved for consultation {consultation.consultation_id}.",
extra={'trace_id': trace_id})
except Exception as e:
logger.error(f"Error during transcription process: {str(e)}", exc_info=True, extra={'trace_id': trace_id})
await send_discord_alert(
f"Error during transcription process for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
return
await notification_manager.send_personal_message(
f"Consultation {consultation.consultation_id} has been transcribed.", vet_email
)
try:
template_service = TemplateService()
medical_record_template = template_service.get_template_by_name(
consultation.medical_record_template_id).sections
sections = await consultation_processor.extract_structured_sections(
transcription=consultation.full_transcript,
notes=notes,
language=language,
template=medical_record_template,
)
consultation.sections = sections
consultation_service.save_consultation(clinic_id, vet_email, consultation)
logger.info(f"Sections processed for consultation {consultation.consultation_id}.",
extra={'trace_id': trace_id})
except Exception as e:
logger.error(f"Error processing sections for consultation {consultation.consultation_id}: {str(e)}",
exc_info=True, extra={'trace_id': trace_id})
await send_discord_alert(
f"Error processing sections for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
raise e
await notification_manager.send_personal_message(
f"Consultation {consultation.consultation_id} is fully processed.", vet_email
)
logger.info(f"Successfully processed consultation {consultation.consultation_id}.",
extra={'trace_id': trace_id})
首先,我不明白你为什么要分块地这样做。 从技术角度来看,这没有任何意义,也没有实际意义。 音频中肯定有时您想要转录的单词和短语与任意 30 秒边界重叠?
我认为您将其作为流来完成会更好。
一些高级步骤:
ffmpeg -i <url> ...
),并让它使用自己的 HTTP 客户端来获取和传输它。 FFmpeg 将根据需要对流施加背压,并在需要时在流通过时对其进行节流。 (不过,这可能不是必需的,我们只是解码 Opus,如今它几乎占用了零 CPU。)