服务器发送事件 (SSE) 与 RESTEasy 耗尽 Jetty 的线程池

问题描述 投票:0回答:1

编辑:

  • 使用执行器摆脱
    Thread.sleep()
  • 添加了对
    servletHolder.setAsyncSupported(true)
    的呼叫。

问题

我们正在尝试使用

实现服务器发送事件(SSE)
  • RESTEasy(6.2.11.最终版)
  • 码头 (12.0.15)

但我们遇到的问题是,每个 SSE 订阅都会在整个订阅期间消耗并保留 Jetty 线程池中的一个线程。

如果我们使用 Jersey 而不是 RESTEasy,一切都会按预期进行。

有没有办法配置RESTEasy不耗尽Jetty的线程池?

使用RESTEasy实现SSE的代码

用于处理 SSE 订阅并每秒向订阅者推送事件的资源类:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;

@Path("/events")
public class EventsResource {
    private final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    private final static AtomicInteger channelIds = new AtomicInteger();

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void subscribe(@Context SseEventSink sink, @Context Sse sse) {
          final int channelId = channelIds.getAndIncrement();
          final Runnable sendMessage = () -> {
                final OutboundSseEvent event = sse.newEventBuilder()
                    .name("message")
                    .data(String.class, "Hello client " + channelId + "!")
                    .build();
                sink.send(event);
          };
          executor.scheduleAtFixedRate(sendMessage, 0, 1, TimeUnit.SECONDS);
    }
}

资源类已添加到我们的

Application
类中的类集中:

import java.util.HashSet;
import java.util.Set;
import jakarta.ws.rs.core.Application;

public class RestApplication extends Application {
    /** {@inheritDoc} */
    @Override
    public Set<Class<?>> getClasses() {
        Set<Class<?>> classes = new HashSet<>();
        classes.add(EventsResource.class);
        return classes;
    }
}

最后,我们有了主类,我们可以在其中使用应用程序和 5 个线程的最小线程池启动 Jetty:

import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.example.rest.RestApplication;
import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;

public class Main {
  public static void main(String[] args) throws Exception {
    Server server = new Server(new QueuedThreadPool(5));

    ServerConnector serverConnector = new ServerConnector(server);
    serverConnector.setPort(8081);
    server.setConnectors(new Connector[] { serverConnector });

    ServletHolder servletHolder = new ServletHolder(new HttpServletDispatcher());
    servletHolder.setInitParameter("jakarta.ws.rs.Application", RestApplication.class.getName());
    servletHolder.setAsyncSupported(true);
    ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
    servletContextHandler.addServlet(servletHolder, "/*");
    server.setHandler(servletContextHandler);

    server.start();
    server.join();
  }
}

耗尽 Jetty 的线程池

订阅SSE时,我们得到:

% curl "http://localhost:8081/events" -H 'Accept: text/event-stream'

event: message
data: 0: Hello client 0!

event: message
data: 1: Hello client 0!

event: message
data: 2: Hello client 0!

...

如果我们启动 2 个并行

curl
命令,Jetty 线程池就会耗尽,第三个请求将挂起等待线程。

java jersey jetty resteasy server-sent-events
1个回答
0
投票

首先将调度程序添加到您的班级中:

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);

然后将你的方法更改为

  @GET
  @Produces("text/event-stream")
  public void subscribe(@Context SseEventSink sink, @Context Sse sse) throws InterruptedException {
    executor.scheduleAtFixedRate(()-> {
              final OutboundSseEvent event = sse.newEventBuilder()
                          .name("message")
                          .data(String.class, "Hello client!")
                          .build();
              sink.send(event);
    }, 0, 1, TimeUnit.SECONDS);
  }
© www.soinside.com 2019 - 2024. All rights reserved.