OpenAI 聊天集成 openai >1.0 - 性能

问题描述 投票:0回答:1

根据可用信息(不幸的是,ChatGPT 不太有用),我创建了以下代码,允许我与助手 API 进行交互。但是,我仍然不喜欢 _wait_for_run_completion 方法和 while 循环。有没有更好的方法来处理这个问题?

import os
import openai
from dotenv import load_dotenv
import time


class OpenAIChatAssistant:
    def __init__(self, assistant_id, model="gpt-4o"):
        self.assistant_id = assistant_id
        self.model = model

        if self.model != "just_copy":
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            self._create_new_thread()
        print('new instance started')

    def _create_new_thread(self):
        self.thread = self.client.beta.threads.create()
        self.thread_id = self.thread.id
        print(self.thread_id)

    def reset_thread(self):
        if self.model != "just_copy":
            self._create_new_thread()

    def set_model(self, model_name):
        self.model = model_name

        if self.model != "just_copy" and not hasattr(self, 'client'):
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            self._create_new_thread()

    def send_message(self, message):
        if self.model == "just_copy":
            return message

        self.client.beta.threads.messages.create(
            thread_id=self.thread_id, role="user", content=message
        )
        run = self.client.beta.threads.runs.create(
            thread_id=self.thread_id,
            assistant_id=self.assistant_id,
            model=self.model
        )
        return self._wait_for_run_completion(run.id)

    def _wait_for_run_completion(self, run_id, sleep_interval=1):
        counter = 1
        while True:
            try:
                run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
                if run.completed_at:
                    messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)
                    last_message = messages.data[0]
                    response = last_message.content[0].text.value
                    print(f'hello {counter}')
                    return response
            except Exception as e:
                raise RuntimeError(f"An error occurred while retrieving answer: {e}")
            counter += 1
            time.sleep(sleep_interval)
python python-3.x openai-api
1个回答
0
投票

为了使

_wait_for_run_completion
更好,请使用异步和
await
。这使得它更快并且不需要 while 循环。

  1. 为异步请求安装
    aiohttp
pip install aiohttp
  1. 更改代码以使用异步。

这里的例子:

import os
import openai
import asyncio
from aiohttp import ClientSession
from dotenv import load_dotenv

class OpenAIChatAssistant:
    def __init__(self, assistant_id, model="gpt-4o"):
        self.assistant_id = assistant_id
        self.model = model

        if self.model != "just_copy":
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            asyncio.run(self._create_new_thread())
        print('new instance started')

    async def _create_new_thread(self):
        self.thread = self.client.beta.threads.create()
        self.thread_id = self.thread.id
        print(self.thread_id)

    async def reset_thread(self):
        if self.model != "just_copy":
            await self._create_new_thread()

    def set_model(self, model_name):
        self.model = model_name

        if self.model != "just_copy" and not hasattr(self, 'client'):
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            asyncio.run(self._create_new_thread())

    async def send_message(self, message):
        if self.model == "just_copy":
            return message

        await self.client.beta.threads.messages.create(
            thread_id=self.thread_id, role="user", content=message
        )
        run = await self.client.beta.threads.runs.create(
            thread_id=self.thread_id,
            assistant_id=self.assistant_id,
            model=self.model
        )
        return await self._wait_for_run_completion(run.id)

    async def _wait_for_run_completion(self, run_id, sleep_interval=1):
        counter = 1
        async with ClientSession() as session:
            while True:
                try:
                    run = await self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
                    if run.completed_at:
                        messages = await self.client.beta.threads.messages.list(thread_id=self.thread_id)
                        last_message = messages.data[0]
                        response = last_message.content[0].text.value
                        print(f'hello {counter}')
                        return response
                except Exception as e:
                    raise RuntimeError(f"An error occurred while retrieving answer: {e}")
                counter += 1
                await asyncio.sleep(sleep_interval)

这使得它更快并且没有 while 循环。使用

asyncio.run()
运行异步函数。

© www.soinside.com 2019 - 2024. All rights reserved.