为什么不能从服务器端消费任何 sse 内容

问题描述 投票:0回答:0

我正在使用 Python fastapi 编写 SSE api,这是服务器端 api 代码:

@router.get("/v1/completions")
def stream_chat(q: str, authorization: str = Header(None)):
    auth_mode, auth_token = authorization.split(' ')
    if auth_token is None:
        return "Authorization token is missing"
    openai.api_key = auth_token
    async def event_generator():
        completions = openai.Completion.create(
            engine="text-davinci-003",
            prompt=q,
            max_tokens=50,
            n=1,
            stop=None,
            temperature=0.5,
            stream=True
        )
        for _ in completions:
            yield f"{_}\n"
    return StreamingResponse(event_generator(), media_type="text/event-stream")

然后我使用 curl 来测试这样的 api:

➜  ~ curl -N -H "Authorization: Bearer sk-balabalabala...." https://chat.poemhub.top/v1/completions\?q\=hello
{
  "choices": [
    {
      "finish_reason": null,
      "index": 0,
      "logprobs": null,
      "text": "\n"
    }
  ],
  "created": 1679625015,
  "id": "cmpl-6xRTzZKY7MvhQ4nKR39xSPqq9uCvW",
  "model": "text-davinci-003",
  "object": "text_completion"
}
{
  "choices": [
    {
      "finish_reason": null,
      "index": 0,
      "logprobs": null,
      "text": "\n"
    }
  ],
  "created": 1679625015,
  "id": "cmpl-6xRTzZKY7MvhQ4nKR39xSPqq9uCvW",
  "model": "text-davinci-003",
  "object": "text_completion"
}

这似乎工作正常,然后我想在 java 中使用消息,我尝试使用 okhttp 和 webclient,这是我的最小可运行代码:

public static void main(String[] args) {
    okHttpEvent();
    //consumeServerSentEvent();
}

public static void okHttpEvent() {
    Request request = new Request.Builder().url("https://chat.poemhub.top/v1/completions?q=hello").build();
    OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(new Interceptor() {
                @NonNull
                @Override
                public Response intercept(@NonNull Interceptor.Chain chain) throws IOException {
                    Request original = chain.request();
                    Request request = original.newBuilder()
                            .header("Authorization", "Bearer sk-balabalabala...")
                            .header("Accept",MediaType.TEXT_EVENT_STREAM.toString())
                            .method(original.method(), original.body())
                            .build();
                    return chain.proceed(request);
                }
            })
            .connectTimeout(1, TimeUnit.DAYS)
            .readTimeout(1, TimeUnit.DAYS)
            .build();

    RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
        private long callStartNanos;

        private void printEvent(String name) {
            long nowNanos = System.nanoTime();
            if (name.equals("callStart")) {
                callStartNanos = nowNanos;
            }
            long elapsedNanos = nowNanos - callStartNanos;
            System.out.printf("=====> %.3f %s%n", elapsedNanos / 1000000000d, name);
        }

        @Override
        public void onOpen(EventSource eventSource, Response response) {
            printEvent("onOpen");
        }

        @Override
        public void onEvent(EventSource eventSource, String id, String type, String data) {
            printEvent("onEvent");
            System.out.println(data);
        }

        @Override
        public void onClosed(EventSource eventSource) {
            printEvent("onClosed");
        }

        @Override
        public void onFailure(EventSource eventSource, Throwable t, Response response) {
            printEvent("onFailure");
        }
    });

    realEventSource.connect(okHttpClient);
}

public static void consumeServerSentEvent() throws InterruptedException {
    ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<>() {
    };
    WebClient client = WebClient.create("https://chat.poemhub.top");
    Flux<ServerSentEvent<String>> eventStream = client.get()
            .uri("/v1/completions?q=hello")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .headers(httpHeaders -> {
                httpHeaders.set("Authorization", "Bearer sk-balabalabala...");
                httpHeaders.set("Content-Type", MediaType.TEXT_EVENT_STREAM.toString());
            })
            .retrieve()
            .bodyToFlux(type);

    eventStream.subscribe(
            content -> {
                log.info("Time: {} - event: name[{}], id [{}], content[{}] ",
                        LocalTime.now(), content.event(), content.id(), content.data());
            },
            error -> {
                log.error("Error receiving SSE: {}", error);
            },
            () -> {
                log.info("Completed!!!");
            }
    );

    Thread.sleep(15000);
}

okhttp 和webclient 都可以连接成功,但是没有收到任何消息。我错过了什么吗?我应该怎么做才能解决这个问题并让客户成功使用 sse messgae?我现在已经尝试让线程休眠 15 秒,并确保 sse 消息将在 5 秒内响应。

java fastapi okhttp server-sent-events spring-webclient
© www.soinside.com 2019 - 2024. All rights reserved.