问题陈述:
我在尝试使用 Python 中的 OpenAI API 从 YouTube 视频转录音频时遇到错误。我的目标是将音频保存到 S3 存储桶,然后将 S3 URL 传递到 OpenAI API 进行转录。但是,我在实现此目标时面临困难,需要帮助解决错误。
详情:
我开发了一个 Flask 应用程序,用户可以在其中提供 YouTube 视频的链接以及开始和结束时间戳。应用程序应下载视频,提取指定片段,使用 OpenAI API 转录音频,然后返回转录内容。为了实现这一目标,我利用 MoviePy 进行视频处理,并利用 OpenAI API 进行转录。
遇到的问题:
在将音频数据传递给OpenAI API的过程中,遇到以下错误:
蟒蛇
处理过程中发生错误:需要 str、bytes 或 os.PathLike 对象,而不是 BytesIO
尝试将音频数据直接传递到 OpenAI API 时会出现此错误。下面是我的代码:
from flask import Flask, jsonify, request
from pytube import YouTube
import boto3
from moviepy.editor import *
from openai import OpenAI
from moviepy.video.tools.subtitles import SubtitlesClip
import random
import io
import requests
import tempfile
app = Flask(__name__)
BUCKET_NAME = "clipwave"
client = OpenAI(api_key="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")
colors = ["green", "yellow", "red", "white"]
fonts = ["Impact", "Comic-Sans-MS-Bold"]
def upload_to_s3(filename, video_stream):
s3_client = boto3.client("s3")
with requests.get(video_stream.url, stream=True) as response:
if response.status_code == 200:
s3_client.upload_fileobj(response.raw, BUCKET_NAME, filename)
def get_subs(clip, key_name):
audio = clip.audio
audio_bytes = io.BytesIO()
audio.write_audiofile(audio_bytes, codec='pcm_s16le') # Write audio to BytesIO object
try:
# Get the bytes content from BytesIO
audio_data = audio_bytes.getvalue()
# Upload audio data directly to S3
upload_audio_to_s3(audio_data, BUCKET_NAME, f"audio_{key_name}")
# Transcribe audio using OpenAI's API
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_data, # Pass the retrieved bytes data
response_format="verbose_json",
timestamp_granularities=["word"]
)
text = transcript.text
timestamps = transcript.words
return {'text': text, 'timestamps': timestamps}
except Exception as e:
print("An error occurred during processing:", e)
raise
def upload_audio_to_s3(audio_bytes, bucket_name, key_name):
s3 = boto3.client('s3')
s3.put_object(Body=audio_bytes, Bucket=bucket_name, Key=key_name)
def get_video_url(filename):
s3_client = boto3.client("s3")
s3_object = s3_client.get_object(Bucket=BUCKET_NAME, Key=filename)
if 'Location' in s3_object:
video_url = s3_object["Location"]
return video_url
else:
region = s3_client.meta.region_name
bucket_url = f"https://{BUCKET_NAME}.s3.{region}.amazonaws.com/"
video_url = bucket_url + filename
return video_url
def generate_subtitles_clip(subs, delay=0.05):
text = subs['text']
timestamps = subs['timestamps']
clips = []
for word_info in timestamps:
start_time = word_info['start'] + delay
end_time = word_info['end'] + delay
word = word_info['word']
clips.append(((start_time, end_time), word.upper()))
font = random.choice(fonts)
color = random.choice(colors)
return SubtitlesClip(clips, lambda txt: TextClip(txt, fontsize=100, color=color, method='caption', stroke_color="black", stroke_width=6, font=font))
def upload_video_to_s3(video_bytes, bucket_name, key_name):
s3 = boto3.client('s3')
s3.put_object(Body=video_bytes, Bucket=bucket_name, Key=key_name)
@app.route('/make-short', methods=['GET'])
def make_short():
s3 = boto3.client('s3')
link = request.args.get('link')
start = request.args.get('start')
end = request.args.get('end')
if link:
try:
youtube_object = YouTube(link)
video_stream = youtube_object.streams.get_highest_resolution()
filename = video_stream.default_filename.replace(' ', '')
upload_to_s3(filename, video_stream)
video_url = get_video_url(filename)
video = VideoFileClip(video_url).subclip(start, end).fx(vfx.fadeout, 1)
# Aspect ratio and cropping logic remains unchanged
# Generate subtitles and create SubtitlesClip
subs_result = get_subs(video, f"subs_{filename}")
subs_clip = generate_subtitles_clip(subs_result)
# Overlay subtitles on the video and write the final video file to a temporary file
final_video = CompositeVideoClip([video.set_duration(subs_clip.duration), subs_clip.set_position(((1920/2 - 1080/2), 1200))])
temp_video_path = tempfile.NamedTemporaryFile(suffix='.mp4').name
final_video.write_videofile(temp_video_path, codec="libx264")
# Upload final video to S3 and clean up uploaded files
with open(temp_video_path, 'rb') as temp_video_file:
video_bytesio = io.BytesIO(temp_video_file.read())
upload_video_to_s3(video_bytesio.getvalue(), BUCKET_NAME, f"{filename}_short")
s3.delete_object(Bucket=BUCKET_NAME, Key=filename)
s3.delete_object(Bucket=BUCKET_NAME, Key=f"subs_{filename}")
url = get_video_url(f"{filename}_short")
return jsonify({"message": "Video uploaded to S3 successfully!", "url": url})
except Exception as e:
print("An error occurred:", e)
return jsonify({"message": "Error downloading or uploading video"}), 500
else:
return jsonify({"message": "Missing 'link' parameter"}), 400
if __name__ == "__main__":
app.run(port=3000, debug=True)
请求协助:
我正在寻求有关如何正确将音频保存到 S3 存储桶,然后将 S3 URL 传递到 OpenAI API 进行转录的指导。我应该采取哪些步骤来确保流程顺利并避免遇到错误?
我们将非常感谢您的见解和建议。谢谢!
我尝试过的:
I attempted to pass the audio data directly to the OpenAI API, but encountered the error mentioned above.
I also tried writing the audio to a temporary file and passing the file path to the OpenAI API, but encountered similar errors.
要解决错误“需要 str、字节或 os.PathLike 对象,而不是 BytesIO”,请按照以下步骤操作:
使用音频文件的 S3 URL 通过 OpenAI API 进行转录。