根据可用信息(不幸的是,ChatGPT 不太有用),我创建了以下代码,允许我与助手 API 进行交互。但是,我仍然不喜欢 _wait_for_run_completion 方法和 while 循环。有没有更好的方法来处理这个问题?
import os
import openai
from dotenv import load_dotenv
import time
class OpenAIChatAssistant:
def __init__(self, assistant_id, model="gpt-4o"):
self.assistant_id = assistant_id
self.model = model
if self.model != "just_copy":
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
self._create_new_thread()
print('new instance started')
def _create_new_thread(self):
self.thread = self.client.beta.threads.create()
self.thread_id = self.thread.id
print(self.thread_id)
def reset_thread(self):
if self.model != "just_copy":
self._create_new_thread()
def set_model(self, model_name):
self.model = model_name
if self.model != "just_copy" and not hasattr(self, 'client'):
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
self._create_new_thread()
def send_message(self, message):
if self.model == "just_copy":
return message
self.client.beta.threads.messages.create(
thread_id=self.thread_id, role="user", content=message
)
run = self.client.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id=self.assistant_id,
model=self.model
)
return self._wait_for_run_completion(run.id)
def _wait_for_run_completion(self, run_id, sleep_interval=1):
counter = 1
while True:
try:
run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
if run.completed_at:
messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)
last_message = messages.data[0]
response = last_message.content[0].text.value
print(f'hello {counter}')
return response
except Exception as e:
raise RuntimeError(f"An error occurred while retrieving answer: {e}")
counter += 1
time.sleep(sleep_interval)
为了使
_wait_for_run_completion
更好,请使用异步和 await
。这使得它更快并且不需要 while 循环。
aiohttp
。pip install aiohttp
这里的例子:
import os
import openai
import asyncio
from aiohttp import ClientSession
from dotenv import load_dotenv
class OpenAIChatAssistant:
def __init__(self, assistant_id, model="gpt-4o"):
self.assistant_id = assistant_id
self.model = model
if self.model != "just_copy":
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
asyncio.run(self._create_new_thread())
print('new instance started')
async def _create_new_thread(self):
self.thread = self.client.beta.threads.create()
self.thread_id = self.thread.id
print(self.thread_id)
async def reset_thread(self):
if self.model != "just_copy":
await self._create_new_thread()
def set_model(self, model_name):
self.model = model_name
if self.model != "just_copy" and not hasattr(self, 'client'):
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
asyncio.run(self._create_new_thread())
async def send_message(self, message):
if self.model == "just_copy":
return message
await self.client.beta.threads.messages.create(
thread_id=self.thread_id, role="user", content=message
)
run = await self.client.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id=self.assistant_id,
model=self.model
)
return await self._wait_for_run_completion(run.id)
async def _wait_for_run_completion(self, run_id, sleep_interval=1):
counter = 1
async with ClientSession() as session:
while True:
try:
run = await self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
if run.completed_at:
messages = await self.client.beta.threads.messages.list(thread_id=self.thread_id)
last_message = messages.data[0]
response = last_message.content[0].text.value
print(f'hello {counter}')
return response
except Exception as e:
raise RuntimeError(f"An error occurred while retrieving answer: {e}")
counter += 1
await asyncio.sleep(sleep_interval)
这使得它更快并且没有 while 循环。使用
asyncio.run()
运行异步函数。