我正在使用 Python 开发 AI 代理的自定义实现,其中代理依赖 langchain 工具(方法)与 PostgreSQL 数据库交互。 ToolWithConnection 类初始化数据库连接并定义 list_tables、tables_schema、check_sql 和execute_sql_to_df 等工具。这些工具随后被 CrewAI 代理用来执行各种任务。
这是代码的相关部分:
连接方式:
def connection(x):
postgres_schm = x["database_schm"]
postgres_host = x["database_host"]
postgres_port = x["database_port"]
postgres_user = x["database_user"]
postgres_pass = x["database_pass"]
postgres_dryr = x["database_dryr"]
if postgres_dryr == "true":
postgres_pass_dec = postgres_pass
else:
postgres_pass_dec = decrypt_string(postgres_pass)
CONNECTION_STRING = (
f"postgresql+psycopg2://{postgres_user}:{postgres_pass_dec}@{postgres_host}:{postgres_port}/{postgres_schm}"
)
db = SQLDatabase.from_uri(
CONNECTION_STRING,
schema=postgres_schm,
view_support=True
)
return db
工具类:
class ToolWithConnection:
def __init__(self, x):
"""Initialize with a connection based on the input 'x'."""
self.db = connection(x)
print('>> Initialize db connection ✔\n')
@tool("list_tables")
def list_tables(self) -> str:
"""List the available tables in the database."""
return ListSQLDatabaseTool(db=self.db).invoke("")
@tool("tables_schema")
def tables_schema(self, tables: str) -> str:
"""Get the schema of specified tables."""
tool = InfoSQLDatabaseTool(db=self.db)
return tool.invoke(tables)
CrewAI 包装器:
class CrewAIWrapper:
def __init__(self, x):
self.x = x
self.llm_crew = LLM(model='azure/GPT4o')
tool_instance = ToolWithConnection(self.x)
self.sql_dev = Agent(
role="Senior Database Developer",
goal="Construct and execute PostgreSQL queries based on a query {query}",
tools=[
tool_instance.list_tables,
tool_instance.tables_schema,
],
allow_delegation=False,
memory=False,
verbose=True,
)
def kickoff_crew(self):
extract_data_task = Task(
description="Extract data that is required for the query {query}.",
agent=self.sql_dev,
)
my_crew = Crew(
agents=[self.sql_dev],
tasks=[extract_data_task],
verbose=True,
)
try:
crew_result = my_crew.kickoff(
inputs={"query": self.x['question']}
)
return crew_result.raw
except Exception as e:
print(f"Error during task execution: {str(e)}")
链条设置:
class CustomCrewAILLM(LLM):
model: str
def __init__(self, model='azure/GPT4o'):
super().__init__(model=model)
@property
def _llm_type(self):
return 'custom_crew_ai'
def _call(self, obj: dict,) -> str:
crew_ai_wrapper = CrewAIWrapper(obj, )
result = crew_ai_wrapper.kickoff_crew()
print(type(result))
# print(result)
return result
class InputType(BaseModel):
crewai_input: Optional[str]
db: Optional[SQLDatabase] = Field(default=None, exclude=True) # Exclude from schema validation
database_schm: str
database_host: str
database_port: str
database_user: str
database_pass: str
database_name: str
database_dryr: str
class Config:
arbitrary_types_allowed = True # Allow non-Pydantic types like SQLDatabase
# @field_validator("crewai_input")
# def validate_input(cls, v):
# if not isinstance(v, str):
# raise ValidationError(f"Expected crewai_input to be a str, got {type(v)} instead.")
# return v
class OutputType(BaseModel):
crewai_output: Optional[str]
# @field_validator("crewai_output")
# def validate_output(cls, v):
# if not isinstance(v, str):
# raise ValidationError(f"Expected crewai_output to be a str, got {type(v)} instead.")
# return v
chain = (
RunnablePassthrough.assign(
crewai_output=RunnableLambda(lambda x: CustomCrewAILLM()._call(x))
)
).with_types(input_type=InputType,
output_type=OutputType
)
问题:
当我使用代理和工具运行 CrewAIWrapper 时,list_tables 工具出现以下错误:
I encountered an error while trying to use the tool. This was the error: ToolWithConnection.list_tables() missing 1 required positional argument: 'self'.
问题:
为什么调用list_tables时缺少self参数?
如何正确地将实例方法(如 list_tables)从类传递给代理而不会遇到此错误?
我设法通过将每个工具设为一个单独的类来找到解决方案。 以下是我如何构建工具的示例:
每个工具都作为一个独立的类实现,具有名称、功能和描述的属性。这种封装提供了清晰度和模块化。
class ListTablesTool:
def __init__(self, db_connection):
self.name = "list_tables"
self.func = self.list_tables
self.description = "Lists the available tables in the database"
self.db_connection = db_connection
def list_tables(self) -> str:
"""List the available tables in the database"""
return ListSQLDatabaseTool(db=self.db_connection).invoke("")
class TablesSchemaTool:
def __init__(self, db_connection):
self.name = "tables_schema"
self.func = self.tables_schema
self.description = "Gets schema and sample rows for specified tables"
self.db_connection = db_connection
def tables_schema(self, tables: str) -> str:
"""Fetch schema for the provided tables"""
tool = InfoSQLDatabaseTool(db=self.db_connection)
return tool.invoke(tables)
在主工作流程或启动过程中,工具的初始化如下:
self.list_tables = ListTablesTool(self.db)
self.tables_schema = TablesSchemaTool(self.db)
我希望这有帮助!