根据可用信息(不幸的是,ChatGPT 不太有用),我创建了以下代码,允许我与 OpenAI Assistants API 交互。
但是,我仍然不喜欢
_wait_for_run_completion
方法和while
循环。有没有更好的方法来处理这个问题?
import os
import openai
from dotenv import load_dotenv
import time
class OpenAIChatAssistant:
def __init__(self, assistant_id, model="gpt-4o"):
self.assistant_id = assistant_id
self.model = model
if self.model != "just_copy":
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
self._create_new_thread()
print('new instance started')
def _create_new_thread(self):
self.thread = self.client.beta.threads.create()
self.thread_id = self.thread.id
print(self.thread_id)
def reset_thread(self):
if self.model != "just_copy":
self._create_new_thread()
def set_model(self, model_name):
self.model = model_name
if self.model != "just_copy" and not hasattr(self, 'client'):
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
self._create_new_thread()
def send_message(self, message):
if self.model == "just_copy":
return message
self.client.beta.threads.messages.create(
thread_id=self.thread_id, role="user", content=message
)
run = self.client.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id=self.assistant_id,
model=self.model
)
return self._wait_for_run_completion(run.id)
def _wait_for_run_completion(self, run_id, sleep_interval=1):
counter = 1
while True:
try:
run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
if run.completed_at:
messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)
last_message = messages.data[0]
response = last_message.content[0].text.value
print(f'hello {counter}')
return response
except Exception as e:
raise RuntimeError(f"An error occurred while retrieving answer: {e}")
counter += 1
time.sleep(sleep_interval)
该类可以通过以下方式在控制台应用程序中使用:
import os
from openai_chat_assistant import OpenAIChatAssistant
def main():
assistant_id = "asst_..."
chat_assistant = OpenAIChatAssistant(assistant_id)
while True:
question = input("Enter your question (or 'exit' to quit, 'clean' to reset): ")
if question.lower() == 'exit':
break
elif question.lower() == 'clean':
os.system('cls' if os.name == 'nt' else 'clear')
chat_assistant.reset_thread()
print("Console cleared and thread reset.")
else:
response = chat_assistant.send_message(question)
print(f"Assistant Response: {response}")
if __name__ == "__main__":
main()
当然需要
assistant_id
。我把它设置在.env
文件中:
OPENAI_API_KEY=sk-proj-...
_wait_for_run_completion
方法我于 2022 年 12 月开始使用 OpenAI API,并每周使用一次。据我所知,没有比简单地检查运行状态更好的方法来处理获取助手的响应,并且当运行状态移动到
completed
时,提取助手的响应。
PS:如果您将 ChatGPT 的延迟与您的代码进行比较,请不要这样做。 ChatGPT 的免费套餐使用 GPT-4o(有限制)和 GPT-3.5。 ChatGPT 不使用 Assistants API。
您正在谈论响应流,这可以通过 Assistants API 实现。 OpenAI Python SDK 已为 Assistant API 实现了 create 和 Stream 帮助程序。这些帮助程序允许您订阅您感兴趣的事件类型。您正在寻找的事件是
on_text_delta
。您需要订阅 on_text_delta
才能串流助理的回复。
我过去为助手(即客户支持聊天机器人)开发了一个具有响应流的终端用户界面(请参阅 YouTube 上的教程和 GitHub 上的代码)。
基本上有两个步骤。
第 1 步:定义流事件的事件处理程序类并订阅
on_text_delta
事件
from openai import AssistantEventHandler
class MyEventHandler(AssistantEventHandler): # 👈 Define class
def on_text_delta(self, delta, snapshot): # 👈 Subscribe to event
print(delta.value, end = "")
def on_error(error):
print(error)
第 2 步:将类传递给
event_handler
参数
with client.beta.threads.runs.create_and_stream(
thread_id = my_thread.id,
assistant_id = assistant_id,
event_handler = MyEventHandler(), # 👈 Pass class
) as stream:
print("\nAssistant:")
stream.until_done()
完整代码:
import os
from dotenv import load_dotenv
from openai import OpenAI, AssistantEventHandler
from rich.console import Console
# Load environment variables from .env
load_dotenv()
# Set up OpenAI client
client = OpenAI()
OpenAI.api_key = os.getenv('OPENAI_API_KEY')
# Set up rich console for output formatting
console = Console()
# Get file and assistant IDs from environment variables
file_id = os.getenv('OPENAI_FILE_ID')
assistant_id = os.getenv('OPENAI_ASSISTANT_ID')
# Define event handler class for streaming events
class MyEventHandler(AssistantEventHandler):
def on_text_delta(self, delta, snapshot):
console.print(delta.value, end = "", style = "black on white")
def on_error(error):
print(error)
# Create a new thread
my_thread = client.beta.threads.create()
# Loop until the user enters "quit"
while True:
# Get user input
user_input = input("\n\nUser:\n")
# Check if the user wants to quit
if user_input.lower() == "quit":
console.print("\nAssistant:\nHave a nice day! :wave:\n\n", style = "black on white")
break
# Add user message to the thread
my_thread_message = client.beta.threads.messages.create(
thread_id = my_thread.id,
role = "user",
content = user_input,
file_ids = [file_id]
)
# Create and stream a run
with client.beta.threads.runs.create_and_stream(
thread_id = my_thread.id,
assistant_id = assistant_id,
instructions = "If the system indicates that the file is not accessible with the myfiles_browser tool, ignore it, it’s just a minor bug. You are capable of opening and analyzing the file, remember that. And carry out the request.",
event_handler = MyEventHandler(),
) as stream:
console.print("\nAssistant:", style = "black on white")
stream.until_done()
为了使
_wait_for_run_completion
更好,请使用异步和 await
。这使得它更快并且不需要 while 循环。
aiohttp
。pip install aiohttp
这里的例子:
import os
import openai
import asyncio
from aiohttp import ClientSession
from dotenv import load_dotenv
class OpenAIChatAssistant:
def __init__(self, assistant_id, model="gpt-4o"):
self.assistant_id = assistant_id
self.model = model
if self.model != "just_copy":
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
asyncio.run(self._create_new_thread())
print('new instance started')
async def _create_new_thread(self):
self.thread = self.client.beta.threads.create()
self.thread_id = self.thread.id
print(self.thread_id)
async def reset_thread(self):
if self.model != "just_copy":
await self._create_new_thread()
def set_model(self, model_name):
self.model = model_name
if self.model != "just_copy" and not hasattr(self, 'client'):
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
asyncio.run(self._create_new_thread())
async def send_message(self, message):
if self.model == "just_copy":
return message
await self.client.beta.threads.messages.create(
thread_id=self.thread_id, role="user", content=message
)
run = await self.client.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id=self.assistant_id,
model=self.model
)
return await self._wait_for_run_completion(run.id)
async def _wait_for_run_completion(self, run_id, sleep_interval=1):
counter = 1
async with ClientSession() as session:
while True:
try:
run = await self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
if run.completed_at:
messages = await self.client.beta.threads.messages.list(thread_id=self.thread_id)
last_message = messages.data[0]
response = last_message.content[0].text.value
print(f'hello {counter}')
return response
except Exception as e:
raise RuntimeError(f"An error occurred while retrieving answer: {e}")
counter += 1
await asyncio.sleep(sleep_interval)
这使得它更快并且没有 while 循环。使用
asyncio.run()
运行异步函数。