在我的应用程序中,我有一个前端通过网关连接到同一微服务的两个实例,配置了一个外部 rabbitmq 代理, 如果我只启动一个 ms 实例,一切都很好,但是当我部署 2 个实例时,会出现以下情况:
用户登录并与 ms A 建立 websocket 连接
用户触发 web rest 端点,将负载平衡到 ms A,ms 从 websocket 用户注册表中获取简单会话 ID,并向订阅的频道发送消息,没有错误
用户客户端按预期看到 websocket 数据
用户再次触发相同的 web rest 端点,将负载平衡到 ms B,ms 从 websocket 用户注册表中获取简单会话 ID,并向订阅的频道发送消息,没有错误
请求永远不会到达代理或客户端/浏览器,日志在 StompBrokerRelayMessageHandler 中显示以下内容:
No TCP connection for session oxsyxbwp in GenericMessage [payload=byte[1094], headers={simpMessageType=MESSAGE, nativeHeaders={simpOrigDestination=[/user/exchange/amq.direct/get-notifications]}, contentType=application/json, simpSessionId=oxsyxbwp, simpDestination=/exchange/amq.direct/get-notifications-useroxsyxbwp}]
并深入研究源代码,我发现如果 tcp 连接不存在,代理会在不发送消息的情况下返回
预期:
能够成功地为用户 websocket 订阅发布消息,即使是从没有创建到客户端的 websocket 连接/会话的微服务实例,因为我使用的是外部代理,而 spring 应该提醒另一个据我所知,“拥有”websocket 会话的服务器?否则,为什么有人会为外部经纪人中继而烦恼?
这是相关配置
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic", "/queue", "/exchange/")
.setAutoStartup(Boolean.TRUE)
.setSystemHeartbeatReceiveInterval(50000)
.setSystemHeartbeatSendInterval(50000)
.setRelayHost(applicationProperties.getWebsocket().getRelayhost())
.setRelayPort(applicationProperties.getWebsocket().getRelayport())
.setClientLogin(applicationProperties.getWebsocket().getLogin())
.setSystemLogin(applicationProperties.getWebsocket().getLogin())
.setSystemPasscode(applicationProperties.getWebsocket().getPasscode())
.setClientPasscode(applicationProperties.getWebsocket().getPasscode())
.setTcpClient(createTcpClient())
.setUserDestinationBroadcast("/topic/unresolved.user.dest")
.setUserRegistryBroadcast("/topic/registry.broadcast");
config.setPathMatcher(new AntPathMatcher("."));
config.setUserDestinationPrefix("/user");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(applicationProperties.getWebsocket().getRelayhost(),
applicationProperties.getWebsocket().getRelayport(),
new StompReactorNettyCodec());
}
这就是我发布消息的方式:
public static final String CHANNEL_NAME = "/exchange/amq.direct/get-notifications";
public static final String NOTIFICATION_DEST = "/user/exchange/amq.direct/get-notifications";
retrieveWebsocketSessionId(jhiUserId, users)
.ifPresent(sessionId ->
// send to user
messagingTemplate.convertAndSendToUser(
sessionId,
CHANNEL_NAME,
notificationList,
simpMessageHelper.createHeaders(sessionId)
)
);
private Optional<String> retrieveWebsocketSessionId(String jhiUserId, Set<SimpUser> users) {
for (SimpUser user : users) {
if (user.getName().equals(jhiUserId)) {
for (SimpSession session : user.getSessions()) {
// find if that session has our subscription associated
for (SimpSubscription sub : session.getSubscriptions()) {
if (sub.getDestination().equals(NOTIFICATION_DEST)) {
log.info("user {} is logged in. forwarding message to destination {} with session Id: {}", jhiUserId, sub.getDestination(), session.getId());
return Optional.of(session.getId());
}
}
}
}
}
log.info("user {} currently is not logged in or he hasn't a websocket connection yet. discard message", jhiUserId);
return Optional.empty();
}
提前感谢您的任何指示或说明
附注版本:spring-boot-starter-websocket .7.3\spring-boot-starter-websocket-2.7.3.pom
编辑: 作为一种“解决方法”,我改变了我的流程如下:微服务的每个实例都从队列中获取通知消息(实际上在我的完整系统中,其余 api 调用在 rabbitmq 队列中发送消息,通知微服务读取消息,执行一些逻辑并将其推送到 websocket 连接)
然后实例检查用户是否实际连接到 websocket 本地注册表,如果是,则将消息推送到 websocket,否则什么都不做就退出
这就是我收集有关连接到本地实例的用户的信息的方式:
// map of jhiUserId / simpSessionId, or null if not connected to the instance
private Map<String, String> connectedSessionsMap = new ConcurrentHashMap<>();
@EventListener
public void handleBrokerAvailabilityEvent(BrokerAvailabilityEvent event) {
log.info("Broker rabbitmq available: {}", event.isBrokerAvailable());
connectedSessionsMap.clear();
}
@EventListener
public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(event.getMessage());
Principal principal = headers.getUser();
if (principal != null) {
String username = principal.getName();
log.info("Websocket session disconnected, user: {} sessionId: {}, populating connectedSessionsMap cache", username, headers.getSessionId());
connectedSessionsMap.put(username, null);
} else {
log.warn("WARNING: no principal found for ws disconnected event: {} , message: {}", event, event.getMessage());
}
}
@EventListener
public void handleSessionConnectEvent(SessionConnectEvent event) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(event.getMessage());
Principal principal = headers.getUser();
if (principal != null) {
String username = principal.getName();
log.info("Websocket session connect, user: {} sessionId: {}, populating connectedSessionsMap cache", username, headers.getSessionId());
connectedSessionsMap.put(username, headers.getSessionId());
} else {
log.warn("WARNING: no principal found for ws connect event: {} , message: {}", event, event.getMessage());
}
}