我有一个 RAG 聊天机器人,前端和后端在不同的端口上运行。
我们使用 REST API 在它们之间进行通信, 问题是,我正在尝试将“用户”消息和“ai”回复存储到聊天历史数据库中。
当用户发送消息时,我们可以很容易地将其包含到聊天历史数据库中,但是对于“AI”消息,输出是流式传输的,因此,整个消息不能立即可用,我正在使用异步操作。
有什么简单的方法可以解决这个问题吗? 我正在使用 Langchain 进行 RAG 服务。
抹布服务代码:
async def arag_stream(self, input: str, chat_history: List[Dict], authorization: str):
try:
retriever_chain = self._create_retriever_chain(authorization)
document_chain = self._create_document_chain(self.streaming_llm)
conversation_chain = create_retrieval_chain(retriever_chain, document_chain)`
async for chunk in conversation_chain.astream({
"chat_history": chat_history,
"input": input
}):
if 'answer' in chunk:
yield f"{chunk['answer']}" or ""
except Exception as e:
logging.error(f"Error in arag_stream: {e}")
yield f"data: Error: {str(e)}"`
api路由代码:
@router.post("/message_stream")
async def message_stream(request: Request, message: Message):
try:
history_key = "public_chat_history"
# Retrieve existing chat history
chat_history = chat_service.get_chat_history(request, history_key=history_key)
chat_service.add_message_to_history(
request,
history_key=history_key,
message=message.text,
role="user"
)
complete_response = [] # This will collect the streamed chunks
# Event generator for streaming responses chunk by chunk
async def event_generator():
try:
async for chunk in rag_service.arag_stream(
input=message.text,
chat_history=chat_history,
authorization="public"
):
if chunk:
# Add each chunk to the complete response
complete_response.append(chunk)
print(complete_response)
yield chunk # Stream each chunk to the client in real-time
except Exception as e:
logging.error(f"Error in streaming response: {e}")
raise e # Propagate the error for response
# Capture the StreamingResponse
response = StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
"X-Accel-Buffering": "no" # Disable proxy buffering
}
)
# After the streaming completes, save the full response in chat history
async def finalize_chat_history():
# Wait until the generator completes
# Join the complete response chunks to form a full message
try:
print("inside finalize_chat_history")
print(complete_response)
full_response = ''.join(complete_response)
if full_response:
chat_service.add_message_to_history(
request,
history_key=history_key,
message=full_response,
role="ai"
)
logging.info(f"Full response saved to chat history: {full_response}")
else:
logging.warning("Full response is empty. Skipping saving to chat history.")
# Debug log to confirm saving the full response
except Exception as e:
logging.error(f"Error finalizing chat history: {e}")
await finalize_chat_history()
return response
except Exception as e:
logging.error(f"Streaming error: {e}")
raise HTTPException(status_code=500, detail=str(e))
这里,保存到数据库的消息是空的,这是可以理解的,因为 Finalize_chat_history 是在 event_generator 之前调用的。
我尝试使用创建一个新的异步方法来在整个流结束后推送到数据库来解决问题,但它没有解决问题。
通过创建用于推送聊天记录的端点解决了该问题。
@router.post("/public_history")
def add_message_to_public_history(
request: Request,
message: Message,
role : str,
):
try:
history_key = "public_chat_history"
chat_service.add_message_to_history(
request,
history_key=history_key,
message=message.text,
role=role)
return {"message": "Message added to public chat history"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
每次输出流式传输到前端时,我们都会将输出存储在变量中。然后,当流传输完成时,我们向“public_history”路由发送请求以添加聊天记录。
前端代码:
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const chunk = decoder.decode(value, { stream: true });
console.log(chunk)
if (chunk)
{
count_chunk += 1
outputmsg += chunk; // Append the chunk
conversationBoard.lastChild.remove()
addMessage('AI', 'icons/message-bot.png', outputmsg, false);
conversationBoard.scrollTop = conversationBoard.scrollHeight;
}
console.log(count_chunk)
console.log(chunk);
}
addHistory(outputmsg, 'AI')