如何在Python中使用FastAPI转发OpenAI的流响应?

问题描述 投票:0回答:1

这是我的代码,用于从基于事件的 OpenAI 模型检索流响应。 (我只展示了核心部分)

client = OpenAI(api_key=OPEN_AI_API_KEY)

class EventHandler(AssistantEventHandler):
    def on_text_delta(self, delta: TextDelta, snapshot: Text):
        print(delta.value)

with client.beta.threads.runs.stream(
    thread_id=thread_id,
    assistant_id=assistant_id,
    event_handler=EventHandler()
) as stream:
stream.until_done()

on_text_delta 事件在令牌从 API 到达时触发。 我想使用 FastAPI 转发此响应,而不是在输出屏幕上打印。

@app.get("/stream")
async def stream():
    return ...something...

我尝试将响应结果作为 HTTP 正文的一部分:

from fastapi.responses import StreamingResponse

...

@app.post("/stream")
async def stream():
    with client.beta.threads.runs.stream(
        thread_id=thread_id,
        assistant_id=assistant_id,
        event_handler=EventHandler()
    ) as stream:
        stream.until_done()

    return StreamingResponse(EventHandler.generator_function(), media_type="text/plain")

我已经在

generator_function
类中创建了
EventHandler
,但问题是,直到流尚未结束,执行才到达 return 语句。

我也尝试过 websockets,但问题仍然是我的程序执行应该如何流动。 在 API 响应完成之前,流不会继续执行。

python api stream fastapi openai-api
1个回答
0
投票
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
from openai import TextDelta

# Initialize FastAPI and OpenAI client
app = FastAPI()
client = OpenAI(api_key="YOUR_OPENAI_API_KEY")  # Replace with your OpenAI API key

class EventHandler:
    def __init__(self):
        self.response_data = []

    async def on_text_delta(self, delta: TextDelta, snapshot: str):
        # Append the delta value to response_data
        self.response_data.append(delta.value)

    async def generator_function(self):
        for data in self.response_data:
            yield data

@app.post("/stream")
async def stream(thread_id: str, assistant_id: str):
    handler = EventHandler()

    async def stream_response():
        # Start streaming from OpenAI
        with client.beta.threads.runs.stream(
            thread_id=thread_id,
            assistant_id=assistant_id,
            event_handler=handler
        ) as stream:
            # Wait until the stream is done
            stream.until_done()
            # Yield each response data piece
            for data in handler.response_data:
                yield data

    return StreamingResponse(stream_response(), media_type="text/plain")

# Optional: Test client to fetch the stream
if __name__ == "__main__":
    import uvicorn
    import httpx
    import asyncio

    async def fetch_stream():
        async with httpx.AsyncClient() as client:
            async with client.post("http://localhost:8000/stream", json={"thread_id": "your_thread_id", "assistant_id": "your_assistant_id"}) as response:
                async for line in response.aiter_text():
                    print(line)

    # Run the FastAPI server
    uvicorn.run(app, host="0.0.0.0", port=8000)

    # Uncomment the line below to test the stream
    # asyncio.run(fetch_stream())
© www.soinside.com 2019 - 2024. All rights reserved.