这是我的代码,用于从基于事件的 OpenAI 模型检索流响应。 (我只展示了核心部分)
client = OpenAI(api_key=OPEN_AI_API_KEY)
class EventHandler(AssistantEventHandler):
def on_text_delta(self, delta: TextDelta, snapshot: Text):
print(delta.value)
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=EventHandler()
) as stream:
stream.until_done()
on_text_delta 事件在令牌从 API 到达时触发。 我想使用 FastAPI 转发此响应,而不是在输出屏幕上打印。
@app.get("/stream")
async def stream():
return ...something...
我尝试将响应结果作为 HTTP 正文的一部分:
from fastapi.responses import StreamingResponse
...
@app.post("/stream")
async def stream():
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=EventHandler()
) as stream:
stream.until_done()
return StreamingResponse(EventHandler.generator_function(), media_type="text/plain")
我已经在
generator_function
类中创建了 EventHandler
,但问题是,直到流尚未结束,执行才到达 return 语句。
我也尝试过 websockets,但问题仍然是我的程序执行应该如何流动。 在 API 响应完成之前,流不会继续执行。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
from openai import TextDelta
# Initialize FastAPI and OpenAI client
app = FastAPI()
client = OpenAI(api_key="YOUR_OPENAI_API_KEY") # Replace with your OpenAI API key
class EventHandler:
def __init__(self):
self.response_data = []
async def on_text_delta(self, delta: TextDelta, snapshot: str):
# Append the delta value to response_data
self.response_data.append(delta.value)
async def generator_function(self):
for data in self.response_data:
yield data
@app.post("/stream")
async def stream(thread_id: str, assistant_id: str):
handler = EventHandler()
async def stream_response():
# Start streaming from OpenAI
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=handler
) as stream:
# Wait until the stream is done
stream.until_done()
# Yield each response data piece
for data in handler.response_data:
yield data
return StreamingResponse(stream_response(), media_type="text/plain")
# Optional: Test client to fetch the stream
if __name__ == "__main__":
import uvicorn
import httpx
import asyncio
async def fetch_stream():
async with httpx.AsyncClient() as client:
async with client.post("http://localhost:8000/stream", json={"thread_id": "your_thread_id", "assistant_id": "your_assistant_id"}) as response:
async for line in response.aiter_text():
print(line)
# Run the FastAPI server
uvicorn.run(app, host="0.0.0.0", port=8000)
# Uncomment the line below to test the stream
# asyncio.run(fetch_stream())