我正在寻找在 Tomcat 9 中实现长轮询的最佳实践,以便能够使用 HTTPS 协议从服务器向客户端发送消息。
使用同步 I/O 似乎不是一个有效的解决方案,因为每个连接都会产生一个线程,因此数千个连接的客户端将产生数千个线程。
有没有办法在 Tomcat 9 中创建一个异步事件驱动的 servlet,以便所有连接都可以由固定线程池处理?
我找到了 Comet API,但 Tomcat 9 似乎已弃用且不支持它: 带有 Comet 处理器的此 URL 不支持 HTTP 方法 GET
我知道 WebSockets API 是一种替代方案,但 WebSockets 使用 HTTPS 之外的 TCP 端口,我们的一些客户端不接受该端口(他们只想允许 443 端口进行通信)。
我能够基于 Servlet API 3.0 和 AsyncContext 创建长轮询事件驱动的 servlet。
我使用的模式是发布-订阅模式。
pom.xml
中的条目:
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
Servlet 代码:
@WebServlet(urlPatterns = "/rest/polling/*", asyncSupported = true)
public class LongPollingServlet extends HttpServlet {
MessageProducer messageProducer = MessageProducer.getInstance();
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException {
req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(60*1000L);
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
List<Message> messages = messageProducer.getMessages(event.getAsyncContext());
resp.setStatus(200);
// TODO Output messages to resp.getOutputStream() according to the client-server protocol
messageProducer.unsubscribe(event.getAsyncContext());
}
@Override
public void onTimeout(AsyncEvent event) throws IOException { }
@Override
public void onError(AsyncEvent event) throws IOException {
pushSenderPolling.unsubscribe(event.getAsyncContext());
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
}
});
String clientId = ... // TODO Somehow get the client ID, for example from the request URL path
messageProducer.subscribe(clientId, asyncContext);
}
}
一个数据类,存储有关调用长轮询方法的客户端(即“订阅”客户端)的信息:
public class OnlineClient {
public String clientId;
public AsyncContext context;
public final List<Message> messages = new LinkedList<>();
public MessageList(String clientId, AsyncContext context} {
this.clientId = clientId;
this.context = context;
}
}
消息生产者代码:
public class MessageProducer {
// Singleton
private MessageProducer _messageProducer = null;
private MessageProducer() {}
public MessageProducer getInstance() {
if (_messageProducer == null) {
_messageProducer = new MessageProducer();
}
return _messageProducer;
}
// Subscribed clients
private Map<AsyncContext, OnlineClient> contextMap = new HashMap<>();
private Map<String, OnlineClient> clientMap = new HashMap<>();
public void subscribe(String clientId, AsyncContext context) {
OnlineClient client = new OnlineClient(clientId, context);
contextMap.put(context, client);
clientMap.put(clientId, client);
}
public void unsubscribe(AsyncContext context) {
OnlineClient client = contextMap.get(context);
if (client != null) {
clientMap.remove(context.clientId);
contextMap.remove(context);
}
}
public List<Message> getMessages(AsyncContext context) {
OnlineClient client = contextMap.get(context);
List<Message> result = new LinkedList<>();
if (client != null) {
synchronized (client.messages) {
result.addAll(client.messages);
client.messages.clear();
}
}
return result;
}
// Called elsewhere when somebody wants to send a message
public void publish(String clientId, Message message) {
OnlineClient client = clientMap.get(clientId);
if (client == null) {
// Client is not subscribed
// Here we can save the unsent message elsewhere or just do nothing
return;
}
synchronized (client.messages) {
client.messages.add(message);
}
// Notify the servlet processing thread and call onComplete()
client.context.complete();
}
}