编辑:
Thread.sleep()
。servletHolder.setAsyncSupported(true)
的呼叫。我们正在尝试使用
实现服务器发送事件(SSE)但我们遇到的问题是,每个 SSE 订阅都会在整个订阅期间消耗并保留 Jetty 线程池中的一个线程。
如果我们使用 Jersey 而不是 RESTEasy,一切都会按预期进行。
有没有办法配置RESTEasy不耗尽Jetty的线程池?
用于处理 SSE 订阅并每秒向订阅者推送事件的资源类:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;
@Path("/events")
public class EventsResource {
private final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
private final static AtomicInteger channelIds = new AtomicInteger();
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink sink, @Context Sse sse) {
final int channelId = channelIds.getAndIncrement();
final Runnable sendMessage = () -> {
final OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, "Hello client " + channelId + "!")
.build();
sink.send(event);
};
executor.scheduleAtFixedRate(sendMessage, 0, 1, TimeUnit.SECONDS);
}
}
资源类已添加到我们的
Application
类中的类集中:
import java.util.HashSet;
import java.util.Set;
import jakarta.ws.rs.core.Application;
public class RestApplication extends Application {
/** {@inheritDoc} */
@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> classes = new HashSet<>();
classes.add(EventsResource.class);
return classes;
}
}
最后,我们有了主类,我们可以在其中使用应用程序和 5 个线程的最小线程池启动 Jetty:
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.example.rest.RestApplication;
import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;
public class Main {
public static void main(String[] args) throws Exception {
Server server = new Server(new QueuedThreadPool(5));
ServerConnector serverConnector = new ServerConnector(server);
serverConnector.setPort(8081);
server.setConnectors(new Connector[] { serverConnector });
ServletHolder servletHolder = new ServletHolder(new HttpServletDispatcher());
servletHolder.setInitParameter("jakarta.ws.rs.Application", RestApplication.class.getName());
servletHolder.setAsyncSupported(true);
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContextHandler.addServlet(servletHolder, "/*");
server.setHandler(servletContextHandler);
server.start();
server.join();
}
}
订阅SSE时,我们得到:
% curl "http://localhost:8081/events" -H 'Accept: text/event-stream'
event: message
data: 0: Hello client 0!
event: message
data: 1: Hello client 0!
event: message
data: 2: Hello client 0!
...
如果我们启动 2 个并行
curl
命令,Jetty 线程池就会耗尽,第三个请求将挂起等待线程。
首先将调度程序添加到您的班级中:
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
然后将你的方法更改为
@GET
@Produces("text/event-stream")
public void subscribe(@Context SseEventSink sink, @Context Sse sse) throws InterruptedException {
executor.scheduleAtFixedRate(()-> {
final OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, "Hello client!")
.build();
sink.send(event);
}, 0, 1, TimeUnit.SECONDS);
}