我正在使用 Python fastapi 编写 SSE api,这是服务器端 api 代码:
@router.get("/v1/completions")
def stream_chat(q: str, authorization: str = Header(None)):
auth_mode, auth_token = authorization.split(' ')
if auth_token is None:
return "Authorization token is missing"
openai.api_key = auth_token
async def event_generator():
completions = openai.Completion.create(
engine="text-davinci-003",
prompt=q,
max_tokens=50,
n=1,
stop=None,
temperature=0.5,
stream=True
)
for _ in completions:
yield f"{_}\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
然后我使用 curl 来测试这样的 api:
➜ ~ curl -N -H "Authorization: Bearer sk-balabalabala...." https://chat.poemhub.top/v1/completions\?q\=hello
{
"choices": [
{
"finish_reason": null,
"index": 0,
"logprobs": null,
"text": "\n"
}
],
"created": 1679625015,
"id": "cmpl-6xRTzZKY7MvhQ4nKR39xSPqq9uCvW",
"model": "text-davinci-003",
"object": "text_completion"
}
{
"choices": [
{
"finish_reason": null,
"index": 0,
"logprobs": null,
"text": "\n"
}
],
"created": 1679625015,
"id": "cmpl-6xRTzZKY7MvhQ4nKR39xSPqq9uCvW",
"model": "text-davinci-003",
"object": "text_completion"
}
这似乎工作正常,然后我想在 java 中使用消息,我尝试使用 okhttp 和 webclient,这是我的最小可运行代码:
public static void main(String[] args) {
okHttpEvent();
//consumeServerSentEvent();
}
public static void okHttpEvent() {
Request request = new Request.Builder().url("https://chat.poemhub.top/v1/completions?q=hello").build();
OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(new Interceptor() {
@NonNull
@Override
public Response intercept(@NonNull Interceptor.Chain chain) throws IOException {
Request original = chain.request();
Request request = original.newBuilder()
.header("Authorization", "Bearer sk-balabalabala...")
.header("Accept",MediaType.TEXT_EVENT_STREAM.toString())
.method(original.method(), original.body())
.build();
return chain.proceed(request);
}
})
.connectTimeout(1, TimeUnit.DAYS)
.readTimeout(1, TimeUnit.DAYS)
.build();
RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
private long callStartNanos;
private void printEvent(String name) {
long nowNanos = System.nanoTime();
if (name.equals("callStart")) {
callStartNanos = nowNanos;
}
long elapsedNanos = nowNanos - callStartNanos;
System.out.printf("=====> %.3f %s%n", elapsedNanos / 1000000000d, name);
}
@Override
public void onOpen(EventSource eventSource, Response response) {
printEvent("onOpen");
}
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
printEvent("onEvent");
System.out.println(data);
}
@Override
public void onClosed(EventSource eventSource) {
printEvent("onClosed");
}
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
printEvent("onFailure");
}
});
realEventSource.connect(okHttpClient);
}
public static void consumeServerSentEvent() throws InterruptedException {
ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<>() {
};
WebClient client = WebClient.create("https://chat.poemhub.top");
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/v1/completions?q=hello")
.accept(MediaType.TEXT_EVENT_STREAM)
.headers(httpHeaders -> {
httpHeaders.set("Authorization", "Bearer sk-balabalabala...");
httpHeaders.set("Content-Type", MediaType.TEXT_EVENT_STREAM.toString());
})
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> {
log.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data());
},
error -> {
log.error("Error receiving SSE: {}", error);
},
() -> {
log.info("Completed!!!");
}
);
Thread.sleep(15000);
}
okhttp 和webclient 都可以连接成功,但是没有收到任何消息。我错过了什么吗?我应该怎么做才能解决这个问题并让客户成功使用 sse messgae?我现在已经尝试让线程休眠 15 秒,并确保 sse 消息将在 5 秒内响应。