您好,我目前正在尝试运行一个 RAG 应用程序(FAQ 聊天机器人),它由 2 个 UI 组成,其中一个我们可以单独上传文件并将其嵌入存储在 PineCone Vector 存储中,另一个我们可以将所选索引中的嵌入检索到RAG 聊天机器人。我使用 gpt-4o 付费帐户(第 1 层)(30000 个代币)作为我的主要 llm 和 AzureAIDocumentIntelligenceLoader 来异步加载我的 PDF 文件(使用 aload() 函数)来检索 272 页 pdf 并与其聊天。即使我只是输入“hi”,它也会说 -“'message':'在组织 org-wOFxlX2RaRVsbRdbSuZ5iBGM 中对 gpt-4o 的请求太大,每分钟令牌数 (TPM):限制 30000,请求 49634。输入或输出令牌必须减少才能成功运行,请访问 https://platform.openai.com/account/rate-limits 了解更多信息。', 'type': 'tokens', 'param': None, 'code' : 'rate_limit_exceeded'" 当加载 'PyPDFium2Loader' 时,我成功地尝试与之聊天。第一个疑问是当我只向聊天机器人输入 'hi' 时它如何请求 50000 个令牌。第二个疑问是即使我添加了异步func 到 pdf 加载器功能和检索响应时的时间延迟为什么我仍然收到错误代码:429
async def extract_embeddings_upload_index(pdf_path, index_name):
print(f"Loading PDF from path: {pdf_path}")
# Load PDF documents
async def lol(pdf_path):
client= await AzureAIDocumentIntelligenceLoader( api_key="167f20e5ce49431aad891c46e2268696",file_path=pdf_path,api_endpoint="https://rx11.cognitiveservices.azure.com/",api_model="prebuilt-layout",mode="single").aload()
return client
txt_docs = await lol(pdf_path)
#total_pages=txt_docs
#print(f'{total_pages}')
#txt_docs = PyPDFium2Loader(pdf_path).load()
# Split documents
print("Splitting documents...")
splt_docs = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
docs = splt_docs.split_documents(txt_docs)
print(f"Split into {len(docs)} chunks")
# Initialize OpenAI embeddings
print("Initializing OpenAI embeddings...")
embeddings = OpenAIEmbeddings(model='text-embedding-ada-002')
# Upload documents to Pinecone index
print("Initializing Pinecone Vector Store...")
dbx = PineconeVectorStore.from_documents(documents=docs, index_name=index_name, embedding=embeddings)
print(f"Uploaded {len(docs)} documents to Pinecone index '{index_name}'")
def initialize(index_name):
embeddings = ini_embed()
print('11')
dbx = PineconeVectorStore.from_existing_index(index_name=index_name, embedding=embeddings)
print('12')
llm = ChatOpenAI(model='gpt-4o', temperature=0.5, max_tokens=3000)
# model_id="meta-llama/Meta-Llama-3-8B"
#model=AutoModelForCausalLM.from_pretrained(model_id)
#tokenizer=AutoTokenizer.from_pretrained(model)
#pipe=pipeline("text-generation",model=model,tokenizer=tokenizer,max_new_tokens=5000)
repo_id="meta-llama/Llama-2-7b-hf"
print('13')
prompt = ini_prompt()
print('14')
doc_chain = create_stuff_documents_chain(llm, prompt)
print('15')
retriever = dbx.as_retriever()
print('16')
ans_retrieval = create_retrieval_chain(retriever, doc_chain)
print('17')
# Wrap the retrieval chain with RunnableWithMessageHistory
conversational_ans_retrieval = RunnableWithMessageHistory(
ans_retrieval,
lambda session_id: StreamlitChatMessageHistory(key=session_id),
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer"
)
print('17')
print(session_id)
print('18')
return conversational_ans_retrieval
def run_query(retrieval_chain, input_text):
st.write('run query')
try:
# Generate a response using the retrieval chain
time.sleep(60)
response = retrieval_chain.invoke(
{"input": input_text},
config={"configurable": {"session_id": f'{session_id}'}}
)
return response['answer']
except KeyError as e:
st.error(f"KeyError occurred: {e}. Check the response structure.")
return None
第一个疑问是,当我只向聊天机器人输入“hi”时,它是如何请求 50000 个令牌的。
为什么我仍然收到错误代码:429
请检查此链接(@AshokPeddakotla-MSFT 评论)
在这里,我实现了具有指数退避的重试逻辑,以优雅地处理速率限制。
代码:
import time
import asyncio
from some_library import AzureAIDocumentIntelligenceLoader, OpenAIEmbeddings, PineconeVectorStore
from some_other_library import RecursiveCharacterTextSplitter, ChatOpenAI, RunnableWithMessageHistory, StreamlitChatMessageHistory
async def extract_embeddings_upload_index(pdf_path, index_name):
print(f"Loading PDF from path: {pdf_path}")
async def load_pdf(pdf_path):
loader = AzureAIDocumentIntelligenceLoader(
api_key="167f20e5ce49431aad891c46e2268696",
file_path=pdf_path,
api_endpoint="https://rx11.cognitiveservices.azure.com/",
api_model="prebuilt-layout",
mode="single"
)
return await loader.aload()
# Retry logic with exponential backoff
max_retries = 5
retry_delay = 1 # Initial delay in seconds
for attempt in range(max_retries):
try:
txt_docs = await load_pdf(pdf_path)
break
except RateLimitError as e:
print(f"Rate limit exceeded. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print("Failed to load PDF after multiple attempts.")
return
# Split documents
print("Splitting documents...")
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
docs = splitter.split_documents(txt_docs)
print(f"Split into {len(docs)} chunks")
# Initialize OpenAI embeddings
print("Initializing OpenAI embeddings...")
embeddings = OpenAIEmbeddings(model='text-embedding-ada-002')
# Upload documents to Pinecone index
print("Initializing Pinecone Vector Store...")
dbx = PineconeVectorStore.from_documents(documents=docs, index_name=index_name, embedding=embeddings)
print(f"Uploaded {len(docs)} documents to Pinecone index '{index_name}'")
def initialize(index_name):
embeddings = OpenAIEmbeddings(model='text-embedding-ada-002')
dbx = PineconeVectorStore.from_existing_index(index_name=index_name, embedding=embeddings)
llm = ChatOpenAI(model='gpt-4o', temperature=0.5, max_tokens=3000)
prompt = ini_prompt()
doc_chain = create_stuff_documents_chain(llm, prompt)
retriever = dbx.as_retriever()
ans_retrieval = create_retrieval_chain(retriever, doc_chain)
# Wrap the retrieval chain with RunnableWithMessageHistory
conversational_ans_retrieval = RunnableWithMessageHistory(
ans_retrieval,
lambda session_id: StreamlitChatMessageHistory(key=session_id),
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer"
)
return conversational_ans_retrieval
def run_query(retrieval_chain, input_text):
st.write('run query')
try:
# Retry logic with exponential backoff
max_retries = 5
retry_delay = 1 # Initial delay in seconds
for attempt in range(max_retries):
try:
# Generate a response using the retrieval chain
time.sleep(60)
response = retrieval_chain.invoke(
{"input": input_text},
config={"configurable": {"session_id": f'{session_id}'}}
)
return response['answer']
except RateLimitError as e:
print(f"Rate limit exceeded. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
st.error("Failed to retrieve response after multiple attempts.")
return None
except KeyError as e:
st.error(f"KeyError occurred: {e}. Check the response structure.")
return None
如果可能,将较小的请求批量处理在一起,以保持在限制范围内,同时仍然获取必要的数据。
参考: