OpenAI Assistants API:有没有更好的方法来等待助手的响应,以及如何增量显示助手的答案?

问题描述 投票:0回答:2

根据可用信息(不幸的是,ChatGPT 不太有用),我创建了以下代码,允许我与 OpenAI Assistants API 交互。

但是,我仍然不喜欢

_wait_for_run_completion
方法和
while
循环。有没有更好的方法来处理这个问题?

import os
import openai
from dotenv import load_dotenv
import time


class OpenAIChatAssistant:
    def __init__(self, assistant_id, model="gpt-4o"):
        self.assistant_id = assistant_id
        self.model = model

        if self.model != "just_copy":
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            self._create_new_thread()
        print('new instance started')

    def _create_new_thread(self):
        self.thread = self.client.beta.threads.create()
        self.thread_id = self.thread.id
        print(self.thread_id)

    def reset_thread(self):
        if self.model != "just_copy":
            self._create_new_thread()

    def set_model(self, model_name):
        self.model = model_name

        if self.model != "just_copy" and not hasattr(self, 'client'):
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            self._create_new_thread()

    def send_message(self, message):
        if self.model == "just_copy":
            return message

        self.client.beta.threads.messages.create(
            thread_id=self.thread_id, role="user", content=message
        )
        run = self.client.beta.threads.runs.create(
            thread_id=self.thread_id,
            assistant_id=self.assistant_id,
            model=self.model
        )
        return self._wait_for_run_completion(run.id)

    def _wait_for_run_completion(self, run_id, sleep_interval=1):
        counter = 1
        while True:
            try:
                run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
                if run.completed_at:
                    messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)
                    last_message = messages.data[0]
                    response = last_message.content[0].text.value
                    print(f'hello {counter}')
                    return response
            except Exception as e:
                raise RuntimeError(f"An error occurred while retrieving answer: {e}")
            counter += 1
            time.sleep(sleep_interval)

该类可以通过以下方式在控制台应用程序中使用:

import os
from openai_chat_assistant import OpenAIChatAssistant


def main():
    assistant_id = "asst_..."
    chat_assistant = OpenAIChatAssistant(assistant_id)

    while True:
        question = input("Enter your question (or 'exit' to quit, 'clean' to reset): ")
        if question.lower() == 'exit':
            break
        elif question.lower() == 'clean':
            os.system('cls' if os.name == 'nt' else 'clear')
            chat_assistant.reset_thread()
            print("Console cleared and thread reset.")
        else:
            response = chat_assistant.send_message(question)
            print(f"Assistant Response: {response}")


if __name__ == "__main__":
    main()

当然需要

assistant_id
。我把它设置在
.env
文件中:

OPENAI_API_KEY=sk-proj-...
python python-3.x openai-api openai-assistants-api
2个回答
1
投票

关于
_wait_for_run_completion
方法

我于 2022 年 12 月开始使用 OpenAI API,并每周使用一次。据我所知,没有比简单地检查运行状态更好的方法来处理获取助手的响应,并且当运行状态移动到

completed
时,提取助手的响应。

PS:如果您将 ChatGPT 的延迟与您的代码进行比较,请不要这样做。 ChatGPT 的免费套餐使用 GPT-4o(有限制)和 GPT-3.5。 ChatGPT 不使用 Assistants API。

关于“增量答案加载”

您正在谈论响应流,这可以通过 Assistants API 实现。 OpenAI Python SDK 已为 Assistant API 实现了 create 和 Stream 帮助程序。这些帮助程序允许您订阅您感兴趣的事件类型。您正在寻找的事件是

on_text_delta
。您需要订阅
on_text_delta
才能串流助理的回复。

我过去为助手(即客户支持聊天机器人)开发了一个具有响应流的终端用户界面(请参阅 YouTube 上的教程和 GitHub 上的代码)。

基本上有两个步骤。

第 1 步:定义流事件的事件处理程序类并订阅

on_text_delta
事件

from openai import AssistantEventHandler

class MyEventHandler(AssistantEventHandler): # 👈 Define class
    def on_text_delta(self, delta, snapshot): # 👈 Subscribe to event
        print(delta.value, end = "")

    def on_error(error):
        print(error)

第 2 步:将类传递给

event_handler
参数

with client.beta.threads.runs.create_and_stream(
        thread_id = my_thread.id,
        assistant_id = assistant_id,
        event_handler = MyEventHandler(), # 👈 Pass class
    ) as stream:
        print("\nAssistant:")
        stream.until_done()

完整代码

import os
from dotenv import load_dotenv
from openai import OpenAI, AssistantEventHandler
from rich.console import Console

# Load environment variables from .env
load_dotenv()

# Set up OpenAI client
client = OpenAI()
OpenAI.api_key = os.getenv('OPENAI_API_KEY')

# Set up rich console for output formatting
console = Console()

# Get file and assistant IDs from environment variables
file_id = os.getenv('OPENAI_FILE_ID')
assistant_id = os.getenv('OPENAI_ASSISTANT_ID')

# Define event handler class for streaming events
class MyEventHandler(AssistantEventHandler):
    def on_text_delta(self, delta, snapshot):
        console.print(delta.value, end = "", style = "black on white")

    def on_error(error):
        print(error)

# Create a new thread
my_thread = client.beta.threads.create()

# Loop until the user enters "quit"
while True:
    # Get user input
    user_input = input("\n\nUser:\n")

    # Check if the user wants to quit
    if user_input.lower() == "quit":
        console.print("\nAssistant:\nHave a nice day! :wave:\n\n", style = "black on white")
        break

    # Add user message to the thread
    my_thread_message = client.beta.threads.messages.create(
        thread_id = my_thread.id,
        role = "user",
        content = user_input,
        file_ids = [file_id]
    )

    # Create and stream a run
    with client.beta.threads.runs.create_and_stream(
        thread_id = my_thread.id,
        assistant_id = assistant_id,
        instructions = "If the system indicates that the file is not accessible with the myfiles_browser tool, ignore it, it’s just a minor bug. You are capable of opening and analyzing the file, remember that. And carry out the request.",
        event_handler = MyEventHandler(),
    ) as stream:
        console.print("\nAssistant:", style = "black on white")
        stream.until_done()

0
投票

为了使

_wait_for_run_completion
更好,请使用异步和
await
。这使得它更快并且不需要 while 循环。

  1. 为异步请求安装
    aiohttp
pip install aiohttp
  1. 更改代码以使用异步。

这里的例子:

import os
import openai
import asyncio
from aiohttp import ClientSession
from dotenv import load_dotenv

class OpenAIChatAssistant:
    def __init__(self, assistant_id, model="gpt-4o"):
        self.assistant_id = assistant_id
        self.model = model

        if self.model != "just_copy":
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            asyncio.run(self._create_new_thread())
        print('new instance started')

    async def _create_new_thread(self):
        self.thread = self.client.beta.threads.create()
        self.thread_id = self.thread.id
        print(self.thread_id)

    async def reset_thread(self):
        if self.model != "just_copy":
            await self._create_new_thread()

    def set_model(self, model_name):
        self.model = model_name

        if self.model != "just_copy" and not hasattr(self, 'client'):
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            asyncio.run(self._create_new_thread())

    async def send_message(self, message):
        if self.model == "just_copy":
            return message

        await self.client.beta.threads.messages.create(
            thread_id=self.thread_id, role="user", content=message
        )
        run = await self.client.beta.threads.runs.create(
            thread_id=self.thread_id,
            assistant_id=self.assistant_id,
            model=self.model
        )
        return await self._wait_for_run_completion(run.id)

    async def _wait_for_run_completion(self, run_id, sleep_interval=1):
        counter = 1
        async with ClientSession() as session:
            while True:
                try:
                    run = await self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
                    if run.completed_at:
                        messages = await self.client.beta.threads.messages.list(thread_id=self.thread_id)
                        last_message = messages.data[0]
                        response = last_message.content[0].text.value
                        print(f'hello {counter}')
                        return response
                except Exception as e:
                    raise RuntimeError(f"An error occurred while retrieving answer: {e}")
                counter += 1
                await asyncio.sleep(sleep_interval)

这使得它更快并且没有 while 循环。使用

asyncio.run()
运行异步函数。

© www.soinside.com 2019 - 2024. All rights reserved.