如何使用 Flask API 从 LangChain 的 OpenAI 传输响应?

问题描述 投票:0回答:2

我正在使用 Python Flask 应用程序进行数据聊天。在控制台中,我直接从 OpenAI 获得流式响应,因为我可以使用标志

streaming=True
启用流处理。

问题是,我无法比 API 调用中“转发”流或“显示”流。

处理OpenAI和链的代码是:

def askQuestion(self, collection_id, question):
    collection_name = "collection-" + str(collection_id)
    self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))
    self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,  output_key='answer')
    
    chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)


    self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),
                                                        return_source_documents=True,verbose=VERBOSE, 
                                                        memory=self.memory)
    

    result = self.chain({"question": question})
    
    res_dict = {
        "answer": result["answer"],
    }

    res_dict["source_documents"] = []

    for source in result["source_documents"]:
        res_dict["source_documents"].append({
            "page_content": source.page_content,
            "metadata":  source.metadata
        })

    return res_dict

以及API路由代码:

@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):
    question = request.form["question"]
    # response_generator = document_thread.askQuestion(collection_id, question)
    # return jsonify(response_generator)

    def stream(question):
        completion = document_thread.askQuestion(collection_id, question)
        for line in completion['answer']:
            yield line

    return app.response_class(stream_with_context(stream(question)))

我正在使用curl测试我的端点,并且我正在将标志

-N
传递给curl,所以如果可能的话,我应该获得可流式响应。

当我首先进行 API 调用时,端点正在等待处理数据(我可以在 VS 代码的终端中看到可流式答案),完成后,我会一次性显示所有内容。

python flask openai-api langchain
2个回答
3
投票

通过使用

threading
callback
,我们可以从 Flask API 获得流式响应。

在flask API中,您可以通过langchain的回调创建一个队列来注册令牌。

class StreamingHandler(BaseCallbackHandler):
    ...

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        self.queue.put(token)

您可以从烧瓶路线中的同一队列中获得

get
令牌。

from flask import Response, stream_with_context
import threading 

@app.route(....):
def stream_output():
   q = Queue()
   
   def generate(rq: Queue):
      ...
      # add your logic to prevent while loop
      # to run indefinitely  
      while( ...):
          yield rq.get()
   
   callback_fn = StreamingHandler(q)
   
   threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
   return Response(stream_with_context(generate(q))

在你的langchain的

ChatOpenAI
中添加上面的自定义回调
StreamingHandler

self.llm = ChatOpenAI(
  model_name=self.model_name, 
  temperature=self.temperature, 
  openai_api_key=os.environ.get('OPENAI_API_KEY'), 
  streaming=True, 
  callback=[callback_fn,]
)

供参考:


0
投票

不确定我的答案是否更清晰,但与@varunsinghal的答案类似,希望这有帮助:)

import threading
from queue import Queue, Empty
from flask import Flask, request, jsonify, Response, stream_with_context
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate
from langchain.schema.output import LLMResult
from langchain.chains import LLMChain
from langchain_core.callbacks import (
    CallbackManager, 
    StreamingStdOutCallbackHandler,
    BaseCallbackHandler
)

token_queue = Queue() #from queue import Queue

global_memory = ConversationBufferMemory(
    memory_key= "chat_history",
    input_key= "input_query"
)

class LLMTokenQueueHandler(BaseCallbackHandler): 
    """
    This is to change the behavior of LLMChain to 
    store the outputted tokens to a queue
    """ 
    def on_llm_new_token(
        self, 
        token: str, 
        **kwargs
        ) -> None:    
        token_queue.put({"type": "token", "value": token})  

    def on_llm_end(
        self, 
        response: LLMResult, 
        **kwargs
        ) -> None:
        token_queue.put({'type': 'end'})


def generate_text_response(
    input_query: str
    ) -> None:
    """
    Generate text response from LLM
    note that we are not streaming from this 
    function but from the stream_tokens() function
    """
    prompt_template = """
    input your prompt template

    Chat History: 
    {chat_history}

    Human Input: 
    {input_query}
    """

    #adding the LLMTokenQueueHandler to the callback manager
    #so now the tokens are automatically stored into token_queue
    gptchat = ChatOpenAI(
        model_name='model_name', 
        temperature= 0.25, 
        openai_api_key=os.environ.get('OPENAI_API_KEY'), 
        streaming = True,
        callback_manager=CallbackManager([LLMTokenQueueHandler()])
    )

    prompt = PromptTemplate(
        input_variables=[
        "chat_history", 
        "input_query"
        #add more variables if needed
        ],
        template=prompt_template
    )

    llm_chain = LLMChain(
        llm=gptchat,
        prompt=prompt,
        memory=global_memory,
        verbose = False
    )

    #this streaming call triggers the process to 
    #store answer tokens to queue
    for chunk_response in llm_chain.stream(
        {
            "input_query": input_query, 
        }
    ):
        print(chunk_response)


def stream_tokens():  
    """Generator function to stream tokens."""  
    while True:  
        # Wait for a token to be available in the queue and retrieve it  
        token_dict = token_queue.get()  
        print("token_dict: ", token_dict)

        if token_dict["type"] == "token":
            # encode str as byte  
            yield token_dict['value'].encode('utf-8')

        #we need to implement when streaming ends
        #with the 'end' token, then break out of loop
        elif token_dict["type"] == "end":
            break


@app.route('/stream', methods=['POST'])  
def stream_text_response():  
    """
    Stream text response with user input query
    """
    input_json = request.get_json()  
    input_query = input_json.get('stream', '')  
  
    # Start generate_text_response in a separate thread to avoid blocking  
    threading.Thread(
        target=generate_text_response, 
        args=(input_query,)
    ).start()  
  
    # Stream tokens back to the client as they are produced
    # not streaming generate_text_response as it doesn't produce
    # the streamed tokens directly  
    return Response(
        stream_with_context(stream_tokens())
    )  
© www.soinside.com 2019 - 2024. All rights reserved.