我正在尝试构建一个工具,它可以调用 HTTP API,它将在流中返回响应,我想向用户显示该流响应。我将分享使用 OpenAI 的实现作为示例,因为内部 API 也会生成类似的响应。
我已经准备了一个 async_generator,并且单独调用了此工具中定义的链,但是当我尝试通过 agent_executor.astream_events 调用此工具时,我收到此错误。
import json
import os
from typing import List, Any, Optional
from langchain.pydantic_v1 import BaseModel, Field
import aiohttp
from typing import Dict, Any, AsyncGenerator
import time
from langchain.chains.base import Chain
from app.langchain_helpers.llm_settings import get_llm_settings
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
class TestSchema(BaseModel):
user_message: str = Field(default="", description="user_message")
history: List[Any] = Field(default="", description="history")
async def stream_api_response(user_message: str):
api_key = "YOUR_OPENAI_API_KEY"
api_url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "gpt-4",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "write a sentence not more than 10 words."}
],
"stream": True
}
async with aiohttp.ClientSession() as session:
async with session.post(api_url, json=data, headers=headers, ssl=False) as response:
async for chunk in response.content.iter_any():
chunk_str = chunk.decode('utf-8')
lines = [line for line in chunk_str.splitlines() if line]
# some data processing can be ignored.
for line in lines:
if line.startswith("data: "):
json_str = line[len("data: "):] # Remove the 'data: ' prefix
try:
# Parse the JSON response
json_data = json.loads(json_str)
# Extract and return the content from the parsed JSON
if 'choices' in json_data and len(json_data['choices']) > 0:
choice = json_data['choices'][0]
if 'delta' in choice and 'content' in choice['delta']:
yield choice['delta']['content']
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
class StreamTestChain(Chain):
async def _acall(self, inputs: Dict[str, Any]) -> AsyncGenerator[str, None]:
user_input = inputs["user_input"]
async for chunk in stream_api_response(user_input):
if chunk:
yield {"response": chunk, "tool": "TestTool"}
def _call(self, inputs: Dict[str, Any]) -> str:
raise NotImplementedError("Synchronous calls are not supported for streaming chains.")
@property
def input_keys(self) -> List[str]:
return ["user_input"]
@property
def output_keys(self) -> List[str]:
return ["response", "tool"]
class TestTool:
def __init__(self, user_llm):
self.user_llm = user_llm
def generate_chain(self):
return StreamTestChain(
input_variables=["user_input"],
output_variables=["response"]
)
def create_test_tool(self):
test_tool = self.generate_chain().as_tool(
name="TestTool",
description=<TOOL_DESCRIPTION>,
)
return test_tool
现在下面的代码正在运行,我正在获取每个流响应的打印语句:
obj = TestTool("llm")
chain = obj.generate_chain()
async for response in chain._acall({"user_input": "something"}):
print(response)
但是当从代理调用此工具时,我收到此错误:
TypeError("object async_generator can't be used in 'await' expression")Traceback (most recent call last):
File "/venv/lib/python3.12/site-packages/langchain_core/tools/base.py", line 669, in arun
response = await asyncio.create_task(coro, context=context) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.12/site-packages/langchain_core/tools/structured.py", line 85, in _arun
return await self.coroutine(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.12/site-packages/langchain_core/tools/convert.py", line 290, in ainvoke_wrapper
return await runnable.ainvoke(kwargs, config={"callbacks": callbacks})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.12/site-packages/langchain/chains/base.py", line 215, in ainvoke
raise e
File "/venv/lib/python3.12/site-packages/langchain/chains/base.py", line 208, in ainvoke
else **await self._acall(inputs)**
^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object async_generator can't be used in 'await' expression
帮助我尝试解决这个问题,并指导如何绕过这个等待调用,即使我使用 astream_events() 调用来调用agent_executor。
您遇到的错误 TypeError: object async_generator can't be use in 'await' expression 是因为您试图等待异步生成器,而这不是直接可能的。在 Python 中,您可以等待常规协程,但异步生成器需要使用 async for 或特定方法(如 .asend()、.athrow() 等)进行迭代。
解决方案 要绕过此问题,您需要在从 agent_executor 调用工具时正确处理异步生成器。具体来说,当您使用 agent_executor.astream_events() 时,它需要一个返回值(如字符串或对象)的协程,但您的代码是从异步生成器生成的。为了解决这个问题,您应该使该工具能够迭代生成器并收集结果,而不是直接等待异步生成器。
以下是修改 StreamTestChain 来解决此问题的方法:
逐步修复: 将异步生成器包装在使用它的函数中:我们不会直接调用 _acall,而是将其包装在使用生成器并返回结果的协程中。
修改 ainvoke 方法以处理异步生成器:确保当通过 agent_executor 调用该工具时,它会迭代异步生成器。
类 StreamTestChain(链): async def _acall(self, 输入:Dict[str, Any]) -> AsyncGenerator[str, None]: 用户输入=输入[“用户输入”]
async for chunk in stream_api_response(user_input):
if chunk:
yield {"response": chunk, "tool": "RecipeTool"}
async def ainvoke(self, inputs: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Collects all async generator results into a list."""
result = []
async for chunk in self._acall(inputs):
result.append(chunk)
return result
def _call(self, inputs: Dict[str, Any]) -> str:
raise NotImplementedError("Synchronous calls are not supported for streaming chains.")
@property
def input_keys(self) -> List[str]:
return ["user_input"]
@property
def output_keys(self) -> List[str]:
return ["response", "tool"]