我们正在尝试使用
实现服务器发送事件(SSE)但我们遇到的问题是,每个 SSE 订阅都会在整个订阅期间消耗并保留 Jetty 线程池中的一个线程。
有没有一种方法可以使用 RESTEasy / JAX-RS 提供 SSE 事件,而无需保留 Jetty 线程池中的线程?
用于处理 SSE 订阅并每秒向订阅者推送事件的资源类:
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;
@Path("/events")
public class EventsResource {
@GET
@Produces("text/event-stream")
public void subscribe(@Context SseEventSink sink, @Context Sse sse) throws InterruptedException {
for (int i = 0; i < 100000; ++i) {
final OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, i + ": Hello client!")
.build();
sink.send(event);
Thread.sleep(1000);
}
}
}
资源类已添加到我们的
Application
类中的类集中:
import java.util.HashSet;
import java.util.Set;
import jakarta.ws.rs.core.Application;
public class RestApplication extends Application {
/** {@inheritDoc} */
@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> classes = new HashSet<>();
classes.add(EventsResource.class);
return classes;
}
}
最后,我们有了主类,我们可以在其中使用应用程序和 5 个线程的最小线程池启动 Jetty:
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.example.rest.RestApplication;
import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;
public class Main {
public static void main(String[] args) throws Exception {
Server server = new Server(new QueuedThreadPool(5));
ServerConnector serverConnector = new ServerConnector(server);
serverConnector.setPort(8081);
server.setConnectors(new Connector[] { serverConnector });
ServletHolder servletHolder = new ServletHolder(new HttpServletDispatcher());
servletHolder.setInitParameter("jakarta.ws.rs.Application", RestApplication.class.getName());
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContextHandler.addServlet(servletHolder, "/*");
server.setHandler(servletContextHandler);
server.start();
server.join();
}
}
订阅SSE时,我们得到:
% curl "http://localhost:8081/events" -H 'Accept: text/event-stream'
event: message
data: 0: Hello client!
event: message
data: 1: Hello client!
event: message
data: 2: Hello client!
...
如果我们启动 2 个并行
curl
命令,Jetty 线程池就会耗尽,第三个请求将挂起等待线程。
首先将调度程序添加到您的班级中:
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
然后将你的方法更改为
@GET
@Produces("text/event-stream")
public void subscribe(@Context SseEventSink sink, @Context Sse sse) throws InterruptedException {
executor.scheduleAtFixedRate(()-> {
final OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, "Hello client!")
.build();
sink.send(event);
}, 0, 1, TimeUnit.SECONDS);
}