使用Langchain集成Stream API响应并创建一个工具将流响应推送到agent_executor

问题描述 投票:0回答:1

我正在尝试构建一个工具,它可以调用 HTTP API,它将在流中返回响应,我想向用户显示该流响应。我将分享使用 OpenAI 的实现作为示例,因为内部 API 也会生成类似的响应。

我已经准备了一个 async_generator,并且单独调用了此工具中定义的链,但是当我尝试通过 agent_executor.astream_events 调用此工具时,我收到此错误。

import json
import os
from typing import List, Any, Optional

from langchain.pydantic_v1 import BaseModel, Field
import aiohttp
from typing import Dict, Any, AsyncGenerator
import time
from langchain.chains.base import Chain


from app.langchain_helpers.llm_settings import get_llm_settings

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")

class TestSchema(BaseModel):
    user_message: str = Field(default="", description="user_message")
    history: List[Any] = Field(default="", description="history")


async def stream_api_response(user_message: str):
    
    api_key = "YOUR_OPENAI_API_KEY"
    api_url = "https://api.openai.com/v1/chat/completions"
    
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
        "Content-Type": "application/json"
    }
    
    data = {
        "model": "gpt-4",
        "messages": [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "write a sentence not more than 10 words."}
        ],
        "stream": True
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.post(api_url, json=data, headers=headers, ssl=False) as response:
            async for chunk in response.content.iter_any():
                chunk_str = chunk.decode('utf-8')
                lines = [line for line in chunk_str.splitlines() if line]
                
                # some data processing can be ignored.
                for line in lines:
                    if line.startswith("data: "):
                        json_str = line[len("data: "):]  # Remove the 'data: ' prefix
                        try:
                            # Parse the JSON response
                            json_data = json.loads(json_str)
                            
                            # Extract and return the content from the parsed JSON
                            if 'choices' in json_data and len(json_data['choices']) > 0:
                                choice = json_data['choices'][0]
                                if 'delta' in choice and 'content' in choice['delta']:
                                    yield choice['delta']['content']
                        except json.JSONDecodeError as e:
                            print(f"Error decoding JSON: {e}")


class StreamTestChain(Chain):
    async def _acall(self, inputs: Dict[str, Any]) -> AsyncGenerator[str, None]:
        user_input = inputs["user_input"]
        
        async for chunk in stream_api_response(user_input):
            if chunk:
                yield {"response": chunk, "tool": "TestTool"}

    def _call(self, inputs: Dict[str, Any]) -> str:
        raise NotImplementedError("Synchronous calls are not supported for streaming chains.")
    
    @property
    def input_keys(self) -> List[str]:
        return ["user_input"]

    @property
    def output_keys(self) -> List[str]:
        return ["response", "tool"]

class TestTool:
    def __init__(self, user_llm):
        self.user_llm = user_llm

    def generate_chain(self):
        return StreamTestChain(
            input_variables=["user_input"],
            output_variables=["response"]
        )
    
    def create_test_tool(self):
        test_tool = self.generate_chain().as_tool(
            name="TestTool",
            description=<TOOL_DESCRIPTION>,
        )
        return test_tool

现在下面的代码正在运行,我正在获取每个流响应的打印语句:

obj = TestTool("llm")
chain = obj.generate_chain()
async for response in chain._acall({"user_input": "something"}):
    print(response)

但是当从代理调用此工具时,我收到此错误:

TypeError("object async_generator can't be used in 'await' expression")Traceback (most recent call last):


  File "/venv/lib/python3.12/site-packages/langchain_core/tools/base.py", line 669, in arun
    response = await asyncio.create_task(coro, context=context)  # type: ignore
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/venv/lib/python3.12/site-packages/langchain_core/tools/structured.py", line 85, in _arun
    return await self.coroutine(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/venv/lib/python3.12/site-packages/langchain_core/tools/convert.py", line 290, in ainvoke_wrapper
    return await runnable.ainvoke(kwargs, config={"callbacks": callbacks})
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  File "/venv/lib/python3.12/site-packages/langchain/chains/base.py", line 215, in ainvoke
    raise e


  File "/venv/lib/python3.12/site-packages/langchain/chains/base.py", line 208, in ainvoke
    else **await self._acall(inputs)**
         ^^^^^^^^^^^^^^^^^^^^^^^^^


TypeError: object async_generator can't be used in 'await' expression

帮助我尝试解决这个问题,并指导如何绕过这个等待调用,即使我使用 astream_events() 调用来调用agent_executor。

python langchain
1个回答
0
投票

您遇到的错误 TypeError: object async_generator can't be use in 'await' expression 是因为您试图等待异步生成器,而这不是直接可能的。在 Python 中,您可以等待常规协程,但异步生成器需要使用 async for 或特定方法(如 .asend()、.athrow() 等)进行迭代。

解决方案 要绕过此问题,您需要在从 agent_executor 调用工具时正确处理异步生成器。具体来说,当您使用 agent_executor.astream_events() 时,它需要一个返回值(如字符串或对象)的协程,但您的代码是从异步生成器生成的。为了解决这个问题,您应该使该工具能够迭代生成器并收集结果,而不是直接等待异步生成器。

以下是修改 StreamTestChain 来解决此问题的方法:

逐步修复: 将异步生成器包装在使用它的函数中:我们不会直接调用 _acall,而是将其包装在使用生成器并返回结果的协程中。

修改 ainvoke 方法以处理异步生成器:确保当通过 agent_executor 调用该工具时,它会迭代异步生成器。

类 StreamTestChain(链): async def _acall(self, 输入:Dict[str, Any]) -> AsyncGenerator[str, None]: 用户输入=输入[“用户输入”]

    async for chunk in stream_api_response(user_input):
        if chunk:
            yield {"response": chunk, "tool": "RecipeTool"}

async def ainvoke(self, inputs: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Collects all async generator results into a list."""
    result = []
    async for chunk in self._acall(inputs):
        result.append(chunk)
    return result

def _call(self, inputs: Dict[str, Any]) -> str:
    raise NotImplementedError("Synchronous calls are not supported for streaming chains.")

@property
def input_keys(self) -> List[str]:
    return ["user_input"]

@property
def output_keys(self) -> List[str]:
    return ["response", "tool"]
© www.soinside.com 2019 - 2024. All rights reserved.