我正在使用 Python Flask 应用程序进行数据聊天。在控制台中,我直接从 OpenAI 获得流式响应,因为我可以使用标志
streaming=True
启用流处理。
问题是,我无法比 API 调用中“转发”流或“显示”流。
处理OpenAI和链的代码是:
def askQuestion(self, collection_id, question):
collection_name = "collection-" + str(collection_id)
self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key='answer')
chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)
self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),
return_source_documents=True,verbose=VERBOSE,
memory=self.memory)
result = self.chain({"question": question})
res_dict = {
"answer": result["answer"],
}
res_dict["source_documents"] = []
for source in result["source_documents"]:
res_dict["source_documents"].append({
"page_content": source.page_content,
"metadata": source.metadata
})
return res_dict
以及API路由代码:
@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):
question = request.form["question"]
# response_generator = document_thread.askQuestion(collection_id, question)
# return jsonify(response_generator)
def stream(question):
completion = document_thread.askQuestion(collection_id, question)
for line in completion['answer']:
yield line
return app.response_class(stream_with_context(stream(question)))
我正在使用curl测试我的端点,并且我正在将标志
-N
传递给curl,所以如果可能的话,我应该获得可流式响应。
当我首先进行 API 调用时,端点正在等待处理数据(我可以在 VS 代码的终端中看到可流式答案),完成后,我会一次性显示所有内容。
通过使用
threading
和 callback
,我们可以从 Flask API 获得流式响应。
在flask API中,您可以通过langchain的回调创建一个队列来注册令牌。
class StreamingHandler(BaseCallbackHandler):
...
def on_llm_new_token(self, token: str, **kwargs) -> None:
self.queue.put(token)
您可以从烧瓶路线中的同一队列中获得
get
令牌。
from flask import Response, stream_with_context
import threading
@app.route(....):
def stream_output():
q = Queue()
def generate(rq: Queue):
...
# add your logic to prevent while loop
# to run indefinitely
while( ...):
yield rq.get()
callback_fn = StreamingHandler(q)
threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
return Response(stream_with_context(generate(q))
在你的langchain的
ChatOpenAI
中添加上面的自定义回调StreamingHandler
。
self.llm = ChatOpenAI(
model_name=self.model_name,
temperature=self.temperature,
openai_api_key=os.environ.get('OPENAI_API_KEY'),
streaming=True,
callback=[callback_fn,]
)
供参考:
不确定我的答案是否更清晰,但与@varunsinghal的答案类似,希望这有帮助:)
import threading
from queue import Queue, Empty
from flask import Flask, request, jsonify, Response, stream_with_context
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate
from langchain.schema.output import LLMResult
from langchain.chains import LLMChain
from langchain_core.callbacks import (
CallbackManager,
StreamingStdOutCallbackHandler,
BaseCallbackHandler
)
token_queue = Queue() #from queue import Queue
global_memory = ConversationBufferMemory(
memory_key= "chat_history",
input_key= "input_query"
)
class LLMTokenQueueHandler(BaseCallbackHandler):
"""
This is to change the behavior of LLMChain to
store the outputted tokens to a queue
"""
def on_llm_new_token(
self,
token: str,
**kwargs
) -> None:
token_queue.put({"type": "token", "value": token})
def on_llm_end(
self,
response: LLMResult,
**kwargs
) -> None:
token_queue.put({'type': 'end'})
def generate_text_response(
input_query: str
) -> None:
"""
Generate text response from LLM
note that we are not streaming from this
function but from the stream_tokens() function
"""
prompt_template = """
input your prompt template
Chat History:
{chat_history}
Human Input:
{input_query}
"""
#adding the LLMTokenQueueHandler to the callback manager
#so now the tokens are automatically stored into token_queue
gptchat = ChatOpenAI(
model_name='model_name',
temperature= 0.25,
openai_api_key=os.environ.get('OPENAI_API_KEY'),
streaming = True,
callback_manager=CallbackManager([LLMTokenQueueHandler()])
)
prompt = PromptTemplate(
input_variables=[
"chat_history",
"input_query"
#add more variables if needed
],
template=prompt_template
)
llm_chain = LLMChain(
llm=gptchat,
prompt=prompt,
memory=global_memory,
verbose = False
)
#this streaming call triggers the process to
#store answer tokens to queue
for chunk_response in llm_chain.stream(
{
"input_query": input_query,
}
):
print(chunk_response)
def stream_tokens():
"""Generator function to stream tokens."""
while True:
# Wait for a token to be available in the queue and retrieve it
token_dict = token_queue.get()
print("token_dict: ", token_dict)
if token_dict["type"] == "token":
# encode str as byte
yield token_dict['value'].encode('utf-8')
#we need to implement when streaming ends
#with the 'end' token, then break out of loop
elif token_dict["type"] == "end":
break
@app.route('/stream', methods=['POST'])
def stream_text_response():
"""
Stream text response with user input query
"""
input_json = request.get_json()
input_query = input_json.get('stream', '')
# Start generate_text_response in a separate thread to avoid blocking
threading.Thread(
target=generate_text_response,
args=(input_query,)
).start()
# Stream tokens back to the client as they are produced
# not streaming generate_text_response as it doesn't produce
# the streamed tokens directly
return Response(
stream_with_context(stream_tokens())
)