在 Flask 中使用 astream_events 将数据流式传输到前端的问题

问题描述 投票:0回答:1

我无法使用

astream_events
将数据流式传输到前端。我正在开发 Langchain 和 Flask 项目。

我与流媒体相关的代码:

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
        
        async def generate_response():
            response_chunks = []
            context_sources = set()

            try:
                # Streaming the events from LangChain's agent
                async for event in agent_executor.astream_events({"input": user_input, "chat_history": chat_history}, version="v1"):
                    print(event)
                    kind = event["event"]

                    # Handle different types of events
                    
                    
                    # Streaming content
                    if event["event"] == "on_chat_model_stream":
                        response_chunks.append(event["data"]["chunk"].content)
                        chain_response = event["data"]["chunk"].content.replace(' ', '<adding_space_token>')
                        yield f"data:{chain_response}\n\n"
                
                yield "data:[DONE]\n\n"

            except Exception as e:
                print(f"Error during streaming: {e}")
                yield f"data: ERROR: {str(e)}\n\n"
            finally:
                bot_response = ''.join(response_chunks)
                print(context_sources, ' :::context_sources')
                context_sources = set()
                # if len(bot_response) <= 0:
                #     return jsonify({'error': get_error_message(500)}), 500
                add_chat_message(session_id, user_input, bot_response, context_sources)
                handler = AsyncIteratorCallbackHandler()

        response = generate_response()
        print(response)
        return Response(stream_with_context(generate_response()), content_type='text/event-stream')
    except Exception as e:

        print(e)
        return jsonify({'error': get_error_message(500)}), 500```

在 colab 上运行

astream_events
代码时,一切正常,但是当我尝试在 Flask API 中执行相同的操作时,我遇到了这个问题:

<async_generator object send_message_route.<locals>.generate_response at 0x000001C970E4AAC0>
Debugging middleware caught exception in streamed response at a point where response headers were already sent.
Traceback (most recent call last):
  File "D:\personal projects\Hi Edward\claims-coach-backend\venv\Lib\site-packages\werkzeug\wsgi.py", line 256, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "D:\personal projects\Hi Edward\claims-coach-backend\venv\Lib\site-packages\werkzeug\wrappers\response.py", line 32, in _iter_encoded
    for item in iterable:
TypeError: 'function' object is not iterable
flask langchain flask-restful flask-socketio langchain-agents
1个回答
0
投票

我已经通过使用套接字解决了这个问题。

我的整个代码:

@chatbot_controller.route('/chat',methods=['POST']) 异步 def send_message_route(): 尝试: 数据 = 请求.json session_id = data.get('session_id') user_input = data.get('user_input') 用户 = g.user 发出(f'chat_response/{session_id}', {'chunk': f"", 'session_id': session_id, "status": "开始"}, 广播=True, 命名空间='/') 如果不是 session_id 或不是 user_input: 返回 jsonify({'error': get_error_message(400)}), 400

    session =  get_session_chat(session_id)
    
    chat_history = []

    if session and 'messages' in session:
        for msg in session['messages']:
            user_message, bot_message = deserialize_message(msg)
            if user_message:
                chat_history.append(user_message)
            if bot_message:
                chat_history.append(bot_message)

    vector_store = get_vector_store()
    if vector_store is None:
        return jsonify({'error': 'Vector store initialization failed.'}), 500

    username = user['name']
    chat_model = get_large_language_model()
    prompt_template = create_prompt_template(f"When interacting with the user, refer to them as '{username}'. If the user requests to change their username, inform them that this can be done through the settings. If they ask for further instructions on how to do this, politely suggest they refer to the guide for more detailed steps. Try to keep your responses less then 100 words if possible. You will always output html")
    
    tools = [access_vector_store]
    
    agent = create_openai_tools_agent(chat_model, tools, prompt_template)

    agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
    response_chunks = []
    context_sources = []

    try:
        async for event in agent_executor.astream_events({"input": user_input, "chat_history": chat_history}, version="v1"):
            
            kind = event["event"]
            print(kind , ' ::::kind')
            # Handle different types of events
            if kind == "on_chain_start":
                if (
                    event["name"] == "Agent"
                ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
                    print(
                        f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
                    )
                    emit(f'chat_response/{session_id}', {'chunk': f"", 'session_id': session_id, "status": "Thinking"}, broadcast=True, namespace='/')
            
            elif kind == "on_chain_end":
                if (
                    event["name"] == "Agent"
                ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
                    print(
                        f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
                    )
                    emit(f'chat_response/{session_id}', {'chunk': f"", 'session_id': session_id, "status": "Thinking"}, broadcast=True, namespace='/')

            
            if kind == "on_chat_model_stream":
                chunk = event["data"]["chunk"].content
                
                response_chunks.append(chunk)
                chain_response = chunk.replace(' ', '<adding_space_token>')
                emit(f'chat_response/{session_id}', {'chunk': f"data:{chain_response}\n\n", 'session_id': session_id, "status": "Streaming"}, broadcast=True, namespace='/')
            elif kind == "on_tool_start":
                emit(f'chat_response/{session_id}', {'chunk': f"", 'session_id': session_id, "status": "tool"}, broadcast=True, namespace='/')
            elif kind == "on_tool_end":

                emit(f'chat_response/{session_id}', {'chunk': f"", 'session_id': session_id, "status": "tool"}, broadcast=True, namespace='/')
            if kind == "on_retriever_end":
                
                
                if 'output' in event['data']:
                    if 'documents' in event['data']['output']:

                        output = event['data']['output']
                        for doc in output['documents']:
                            document_title = doc.metadata.get('document_title', None)
                            doc_id = doc.metadata.get('document_reference_ID', None)
                            if doc_id:
                                exists = any(item['doc_id'] == doc_id for item in context_sources)
                                if not exists:
                                    context_sources.append({
                                        'doc_id': doc_id,
                                        'title': document_title
                                    })
                                       
                emit(f'chat_response/{session_id}', {'chunk': f"", 'session_id': session_id, "status": "tool", "context_sources": context_sources}, broadcast=True, namespace='/')
        emit(f'chat_response/{session_id}', {'chunk': '[DONE]', 'session_id': session_id}, broadcast=True, namespace='/')

    except Exception as e:
        print(f"Error during streaming: {e}")
        emit(f'chat_response/{session_id}', {'chunk': str(e), 'session_id': session_id, "status": "Done"}, broadcast=True, namespace='/')
    finally:
        bot_response = ''.join(response_chunks)
        print(context_sources, ' :::context_sources')
        # if len(bot_response) <= 0:
        #     return jsonify({'error': get_error_message(500)}), 500
        add_chat_message(session_id, user_input, bot_response, context_sources)
© www.soinside.com 2019 - 2024. All rights reserved.