服务器发送事件 (SSE) 连接耗尽 Jetty 线程池

问题描述 投票:0回答:1

我们正在尝试使用

实现服务器发送事件(SSE)
  • RESTEasy(6.2.11.最终版)
  • 码头 (12.0.15)

但我们遇到的问题是,每个 SSE 订阅都会在整个订阅期间消耗并保留 Jetty 线程池中的一个线程。

有没有一种方法可以使用 RESTEasy / JAX-RS 提供 SSE 事件,而无需保留 Jetty 线程池中的线程?

实现SSE的代码

用于处理 SSE 订阅并每秒向订阅者推送事件的资源类:

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;

@Path("/events")
public class EventsResource {
      @GET
      @Produces("text/event-stream")
      public void subscribe(@Context SseEventSink sink, @Context Sse sse) throws InterruptedException {
            for (int i = 0; i < 100000; ++i) {
                  final OutboundSseEvent event = sse.newEventBuilder()
                              .name("message")
                              .data(String.class, i + ": Hello client!")
                              .build();
                  sink.send(event);
                  Thread.sleep(1000);
            }
      }
}

资源类已添加到我们的

Application
类中的类集中:

import java.util.HashSet;
import java.util.Set;
import jakarta.ws.rs.core.Application;

public class RestApplication extends Application {
    /** {@inheritDoc} */
    @Override
    public Set<Class<?>> getClasses() {
        Set<Class<?>> classes = new HashSet<>();
        classes.add(EventsResource.class);
        return classes;
    }
}

最后,我们有了主类,我们可以在其中使用应用程序和 5 个线程的最小线程池启动 Jetty:

import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.example.rest.RestApplication;
import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;

public class Main {
  public static void main(String[] args) throws Exception {
    Server server = new Server(new QueuedThreadPool(5));

    ServerConnector serverConnector = new ServerConnector(server);
    serverConnector.setPort(8081);
    server.setConnectors(new Connector[] { serverConnector });

    ServletHolder servletHolder = new ServletHolder(new HttpServletDispatcher());
    servletHolder.setInitParameter("jakarta.ws.rs.Application", RestApplication.class.getName());
    ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
    servletContextHandler.addServlet(servletHolder, "/*");
    server.setHandler(servletContextHandler);

    server.start();
    server.join();
  }
}

耗尽 Jetty 的线程池

订阅SSE时,我们得到:

% curl "http://localhost:8081/events" -H 'Accept: text/event-stream'

event: message
data: 0: Hello client!

event: message
data: 1: Hello client!

event: message
data: 2: Hello client!

...

如果我们启动 2 个并行

curl
命令,Jetty 线程池就会耗尽,第三个请求将挂起等待线程。

java jetty resteasy server-sent-events
1个回答
0
投票

首先将调度程序添加到您的班级中:

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);

然后将你的方法更改为

  @GET
  @Produces("text/event-stream")
  public void subscribe(@Context SseEventSink sink, @Context Sse sse) throws InterruptedException {
    executor.scheduleAtFixedRate(()-> {
              final OutboundSseEvent event = sse.newEventBuilder()
                          .name("message")
                          .data(String.class, "Hello client!")
                          .build();
              sink.send(event);
    }, 0, 1, TimeUnit.SECONDS);
  }
© www.soinside.com 2019 - 2024. All rights reserved.