spring websocket rabbitmq 多服务器环境下的外部中继无法扩展

问题描述 投票:0回答:0

在我的应用程序中,我有一个前端通过网关连接到同一微服务的两个实例,配置了一个外部 rabbitmq 代理, 如果我只启动一个 ms 实例,一切都很好,但是当我部署 2 个实例时,会出现以下情况:

  1. 用户登录并与 ms A 建立 websocket 连接

  2. 用户触发 web rest 端点,将负载平衡到 ms A,ms 从 websocket 用户注册表中获取简单会话 ID,并向订阅的频道发送消息,没有错误

  3. 用户客户端按预期看到 websocket 数据

  4. 用户再次触发相同的 web rest 端点,将负载平衡到 ms B,ms 从 websocket 用户注册表中获取简单会话 ID,并向订阅的频道发送消息,没有错误

  5. 请求永远不会到达代理或客户端/浏览器,日志在 StompBrokerRelayMessageHandler 中显示以下内容:

No TCP connection for session oxsyxbwp in GenericMessage [payload=byte[1094], headers={simpMessageType=MESSAGE, nativeHeaders={simpOrigDestination=[/user/exchange/amq.direct/get-notifications]}, contentType=application/json, simpSessionId=oxsyxbwp, simpDestination=/exchange/amq.direct/get-notifications-useroxsyxbwp}]

并深入研究源代码,我发现如果 tcp 连接不存在,代理会在不发送消息的情况下返回

预期

能够成功地为用户 websocket 订阅发布消息,即使是从没有创建到客户端的 websocket 连接/会话的微服务实例,因为我使用的是外部代理,而 spring 应该提醒另一个据我所知,“拥有”websocket 会话的服务器?否则,为什么有人会为外部经纪人中继而烦恼?

这是相关配置

   @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic", "/queue", "/exchange/")
            .setAutoStartup(Boolean.TRUE)
            .setSystemHeartbeatReceiveInterval(50000)
            .setSystemHeartbeatSendInterval(50000)
            .setRelayHost(applicationProperties.getWebsocket().getRelayhost())
            .setRelayPort(applicationProperties.getWebsocket().getRelayport())
            .setClientLogin(applicationProperties.getWebsocket().getLogin())
            .setSystemLogin(applicationProperties.getWebsocket().getLogin())
            .setSystemPasscode(applicationProperties.getWebsocket().getPasscode())
            .setClientPasscode(applicationProperties.getWebsocket().getPasscode())
            .setTcpClient(createTcpClient())

            .setUserDestinationBroadcast("/topic/unresolved.user.dest")
            .setUserRegistryBroadcast("/topic/registry.broadcast");

        config.setPathMatcher(new AntPathMatcher("."));
        config.setUserDestinationPrefix("/user");

    }
   private ReactorNettyTcpClient<byte[]> createTcpClient() {

        return new ReactorNettyTcpClient<>(applicationProperties.getWebsocket().getRelayhost(),
            applicationProperties.getWebsocket().getRelayport(),
            new StompReactorNettyCodec());
   }

这就是我发布消息的方式:

public static final String CHANNEL_NAME = "/exchange/amq.direct/get-notifications";
public static final String NOTIFICATION_DEST = "/user/exchange/amq.direct/get-notifications";

 retrieveWebsocketSessionId(jhiUserId, users)
                .ifPresent(sessionId ->
                    // send to user
                    messagingTemplate.convertAndSendToUser(
                        sessionId,
                        CHANNEL_NAME,
                        notificationList,
                        simpMessageHelper.createHeaders(sessionId)
                    )
                );

    private Optional<String> retrieveWebsocketSessionId(String jhiUserId, Set<SimpUser> users) {
        for (SimpUser user : users) {
            if (user.getName().equals(jhiUserId)) {
                for (SimpSession session : user.getSessions()) {
                    // find if that session has our subscription associated
                    for (SimpSubscription sub : session.getSubscriptions()) {
                        if (sub.getDestination().equals(NOTIFICATION_DEST)) {
                            log.info("user {} is logged in. forwarding message to destination {} with session Id: {}", jhiUserId, sub.getDestination(), session.getId());
                            return Optional.of(session.getId());
                        }
                    }
                }
            }
        }
        log.info("user {} currently is not logged in or he hasn't a websocket connection yet. discard message", jhiUserId);
        return Optional.empty();
    }

提前感谢您的任何指示或说明

附注版本:spring-boot-starter-websocket .7.3\spring-boot-starter-websocket-2.7.3.pom

编辑: 作为一种“解决方法”,我改变了我的流程如下:微服务的每个实例都从队列中获取通知消息(实际上在我的完整系统中,其余 api 调用在 rabbitmq 队列中发送消息,通知微服务读取消息,执行一些逻辑并将其推送到 websocket 连接)

然后实例检查用户是否实际连接到 websocket 本地注册表,如果是,则将消息推送到 websocket,否则什么都不做就退出

这就是我收集有关连接到本地实例的用户的信息的方式:

  // map of jhiUserId / simpSessionId, or null if not connected to the instance
    private Map<String, String> connectedSessionsMap = new ConcurrentHashMap<>();
    @EventListener
    public void handleBrokerAvailabilityEvent(BrokerAvailabilityEvent event) {
        log.info("Broker rabbitmq available: {}", event.isBrokerAvailable());
        connectedSessionsMap.clear();
    }
    @EventListener
    public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
        StompHeaderAccessor headers = StompHeaderAccessor.wrap(event.getMessage());
        Principal principal = headers.getUser();
        if (principal != null) {
            String username = principal.getName();
            log.info("Websocket session disconnected, user: {} sessionId: {}, populating connectedSessionsMap cache", username, headers.getSessionId());
            connectedSessionsMap.put(username, null);
        } else {
            log.warn("WARNING: no principal found for ws disconnected event: {} , message: {}", event, event.getMessage());
        }
    }

    @EventListener
    public void handleSessionConnectEvent(SessionConnectEvent event) {
        StompHeaderAccessor headers = StompHeaderAccessor.wrap(event.getMessage());
        Principal principal = headers.getUser();
        if (principal != null) {
            String username = principal.getName();
            log.info("Websocket session connect, user: {} sessionId: {}, populating connectedSessionsMap cache", username, headers.getSessionId());
            connectedSessionsMap.put(username, headers.getSessionId());
        } else {
            log.warn("WARNING: no principal found for ws connect event: {} , message: {}", event, event.getMessage());
        }

    }
java spring spring-boot websocket rabbitmq
© www.soinside.com 2019 - 2024. All rights reserved.