我有一个 websocket 端点,允许客户端订阅与特定对话相关的消息。由于在任何给定时间都有多个服务实例,因此消息保存在 MongoDb 集合中。因此,每个 Web 套接字会话都需要启动一个轮询器,仅查询与其相关的消息。
多个 Websocket 会话也可能对同一对话感兴趣。
Websocket 客户端预计在握手期间提供对话 ID 查询参数(尽管我认为这可能会成为 URL 的一部分):
ws://my.api.com/wsflow/websocket?conversationId=12345
我创建了一个
ServerWebSocketContainer
并设置了一个HandshakeInterceptor
来获取conversationId查询参数并将其保存为属性。然后,我设置一个 WebSocketHandlerDecorator 并覆盖 afterConnectionEstablished()
以创建一个轮询与该对话 ID 相关的消息的流。对于每条消息,它都会使用 WS 会话的 id 来丰富 sessionId
标头。然后它将其发送到另一个通道,该通道的另一端有一个 WebSocketOutboundMessageHandler。
为了防止重复消息并允许多个会话订阅同一对话,我在文档中包含了一个“sessions”数组。因此,轮询器查询将搜索具有所提供的会话 id 的消息,并且当前的 ws 会话 id 不在已经看到该消息的会话数组中。
这意味着入站适配器还必须有一个更新语句,该语句执行
push
操作,将当前会话 ID 附加到有效负载中的数组中。这感觉有点特殊,但是我想不出另一种不会增加太多复杂性的方法。
代码如下所示。它似乎有效(至少在本地),但我不确定是否有更自然的方法来做到这一点。我还没有确定我是否真的需要包含
useFlowIdAsPrefix()
以及这是否会对 afterConnectionClosed()
调用产生影响,该调用只是执行 flowContext.remove()
。
@Bean
public ServerWebSocketContainer serverContainer(MongoTemplate mongoTemplate, IntegrationFlowContext flowContext) {
ServerWebSocketContainer container = new ServerWebSocketContainer("/wsflow").withSockJs(
new SockJsServiceOptions().setHeartbeatTime(5000L).setTaskScheduler(new TaskSchedulerBuilder().build()));
container.setInterceptors(new HandshakeInterceptor() {
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {}
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String conversationId = ((ServletServerHttpRequest)request).getServletRequest().getParameter("conversationId");
attributes.put("conversationId", conversationId);
return true;
}
});
container.setDecoratorFactories(handler -> new WebSocketHandlerDecorator(handler) {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
String conversationId = session.getAttributes().get("conversationId").toString();
IntegrationFlow f = IntegrationFlow.from(MongoDb.inboundChannelAdapter(mongoTemplate.getMongoDatabaseFactory(),
Query.query(Criteria.where("conversationId").is(conversationId).and("sessions").nin(session.getId())))
.collectionName("test").entityClass(Document.class)
.update(new Update().push("sessions", session.getId())),
p -> p.poller(pm -> pm.fixedDelay(1000L)))
.split()
.enrichHeaders(s -> s.header(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId()))
.channel("wsMessages")
.get();
flowContext.registration(f).id(session.getId()).useFlowIdAsPrefix().register();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
super.afterConnectionClosed(session, closeStatus);
flowContext.remove(session.getId());
}
});
return container;
}
@Bean
public MessageHandler webSocketOutboundAdapter(ServerWebSocketContainer container) {
return new WebSocketOutboundMessageHandler(container);
}
@Bean
public IntegrationFlow wsOut(MessageHandler webSocketOutboundAdapter) {
return IntegrationFlow.from("wsMessages")
.handle(webSocketOutboundAdapter).get();
}
@Bean
public QueueChannelSpec wsMessages() {
return MessageChannels.queue();
}
既然你谈论持久性和不重复,我不认为你关于“已见过的会话”的想法有那么糟糕。
我想不出比你已经拥有的更好的解决方案。 因此,MongoDB 中有一个文档,它属于某个会话,并且具有已看到该消息的会话列表的属性。因此,您已经对其进行了足够的优化,不会从数据库中提取那些已处理的消息。即使重新启动后,你仍然会很好。但是,您可能还需要考虑从该列表中删除已关闭的会话。我不确定来自同一客户端的新会话是否会具有相同的
id
来永远保留它。