Tomcat 9 中用于长轮询的异步事件驱动 servlet

问题描述 投票:0回答:1

我正在寻找在 Tomcat 9 中实现长轮询的最佳实践,以便能够使用 HTTPS 协议从服务器向客户端发送消息。

使用同步 I/O 似乎不是一个有效的解决方案,因为每个连接都会产生一个线程,因此数千个连接的客户端将产生数千个线程。

有没有办法在 Tomcat 9 中创建一个异步事件驱动的 servlet,以便所有连接都可以由固定线程池处理?

我找到了 Comet API,但 Tomcat 9 似乎已弃用且不支持它: 带有 Comet 处理器的此 URL 不支持 HTTP 方法 GET

我知道 WebSockets API 是一种替代方案,但 WebSockets 使用 HTTPS 之外的 TCP 端口,我们的一些客户端不接受该端口(他们只想允许 443 端口进行通信)。

java asynchronous tomcat servlets tomcat9
1个回答
0
投票

我能够基于 Servlet API 3.0 和 AsyncContext 创建长轮询事件驱动的 servlet。

我使用的模式是发布-订阅模式。

pom.xml
中的条目:

<dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>javax.servlet-api</artifactId>
    <version>3.0.1</version>
    <scope>provided</scope>
</dependency>

Servlet 代码:

@WebServlet(urlPatterns = "/rest/polling/*", asyncSupported = true)
public class LongPollingServlet extends HttpServlet {

    MessageProducer messageProducer = MessageProducer.getInstance();

    @Override
    protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) 
        throws ServletException, IOException {
        req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        final AsyncContext asyncContext = req.startAsync();
        asyncContext.setTimeout(60*1000L);
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                List<Message> messages = messageProducer.getMessages(event.getAsyncContext());
                
                resp.setStatus(200);
                // TODO Output messages to resp.getOutputStream() according to the client-server protocol
                
                messageProducer.unsubscribe(event.getAsyncContext());
            }
            
            @Override
            public void onTimeout(AsyncEvent event) throws IOException { }
            
            @Override
            public void onError(AsyncEvent event) throws IOException {
                pushSenderPolling.unsubscribe(event.getAsyncContext());
            }
            
            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
            }
        });
        
        String clientId = ...   // TODO Somehow get the client ID, for example from the request URL path
        messageProducer.subscribe(clientId, asyncContext);
    }
}

一个数据类,存储有关调用长轮询方法的客户端(即“订阅”客户端)的信息:

public class OnlineClient {
    public String clientId;
    public AsyncContext context;
    public final List<Message> messages = new LinkedList<>();
    
    public MessageList(String clientId, AsyncContext context} {
        this.clientId = clientId;
        this.context = context;
    }
}

消息生产者代码:

public class MessageProducer {

    // Singleton
    private MessageProducer _messageProducer = null;
    private MessageProducer() {}
    public MessageProducer getInstance() {
        if (_messageProducer == null) {
            _messageProducer = new MessageProducer();
        }
        return _messageProducer;
    }
    
    // Subscribed clients
    private Map<AsyncContext, OnlineClient> contextMap = new HashMap<>();
    private Map<String, OnlineClient> clientMap = new HashMap<>();
    
    public void subscribe(String clientId, AsyncContext context) {
        OnlineClient client = new OnlineClient(clientId, context);
        contextMap.put(context, client);
        clientMap.put(clientId, client);
    }
    
    public void unsubscribe(AsyncContext context) {
        OnlineClient client = contextMap.get(context);
        if (client != null) {
            clientMap.remove(context.clientId);
            contextMap.remove(context);
        }
    }
    
    public List<Message> getMessages(AsyncContext context) {
        OnlineClient client = contextMap.get(context);
        List<Message> result = new LinkedList<>();
        if (client != null) {
            synchronized (client.messages) {
                result.addAll(client.messages);
                client.messages.clear();
            }
        }
        return result;
    }

    // Called elsewhere when somebody wants to send a message
    public void publish(String clientId, Message message) {
        OnlineClient client = clientMap.get(clientId);
        if (client == null) {
            // Client is not subscribed
            // Here we can save the unsent message elsewhere or just do nothing
            return;
        }
        
        synchronized (client.messages) {
            client.messages.add(message);
        }
        // Notify the servlet processing thread and call onComplete()
        client.context.complete();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.