我是 Spring boot websocket 和消息传递语义的新手。目前我可以使用以下代码发送私人消息。
String queueName = "/user/" + username + "/queue/wishes";
simpMessagingTemplate.convertAndSend(queueName, message);
当尝试使用convertAndSendToUser时,我没有收到任何错误,但消息没有发送。我知道使用 sendToUser 应该对目标名称的形成方式稍作更改,但我没有得到正确的结果。
String queueName = "/user/queue/wishes";
simpMessagingTemplate.convertAndSendToUser(username, queueName, message);
以下是我的订阅代码。
stompClient.subscribe('/user/queue/wishes', function(message) {
alert(message);
})
我有一个类似的问题,如果你的
username
实际上是sessionId
,那么尝试使用接受标头的重载方法之一(在SimpMessageSendingOperations javadoc中如此说):
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(username);
headerAccessor.setLeaveMutable(true);
messagingTemplate.convertAndSendToUser(username, "/queue/wishes", message, headerAccessor.getMessageHeaders());
您必须注册SimpleBroker的前缀:
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic", "/queue", "/user");
registry.setUserDestinationPrefix("/user");
}
虽然 UserDestinationPrefix 已设置为默认值“/user/”,但您必须添加 SimpleBroker 前缀。
它没有被发送,因为您的“目的地”,即“队列名称”不正确。 正如您在此处 SimpMessagingTemplate.java#L230 所看到的,这是在调用
template.convertAndSendToUser()
时作为链的一部分被调用的方法。
此方法已经为最终目的地添加了前缀
/user
。所以你应该做这样的事情:
simpMessagingTemplate.convertAndSendToUser(username,"/queue/wishes", message);
现在有了正确的目的地,它应该被发送给用户。
@配置 @EnableWebSocketMessageBroker @Order(Ordered.HIGHEST_PRECEDENCE + 99) 公共类 WebSocketConfiguration 实现 WebSocketMessageBrokerConfigurer {
@Autowired
private YAMLConfiguration myConfig;
@Autowired
private ApplicationContext context;
@Override
public void registerStompEndpoints ( StompEndpointRegistry registry ) {
//@formatter:off
registry.addEndpoint ( "/our-websocket" )
.setAllowedOrigins ( myConfig.getAllowedOrigins ( ).toArray(String[]::new) );
//@formatter:on
}
@Override
public void configureClientInboundChannel ( ChannelRegistration registration ) {
registration.interceptors ( wsMyCustomChannelInterceptor ( ) );
}
@Bean
public WebSocketMyCustomChannelInterceptor wsMyCustomChannelInterceptor ( ) {
return new WebSocketMyCustomChannelInterceptor ( );
}
@Override
public void configureMessageBroker ( MessageBrokerRegistry registry ) {
registry.setPreservePublishOrder ( true );
registry.setApplicationDestinationPrefixes ( "/ws" );
registry.enableSimpleBroker ( "/topic", "/queue" );
}
}
@Slf4j 公共类 WebSocketMyCustomChannelInterceptor 实现 ChannelInterceptor {
@Autowired
private JwtTokenProvider jwtTokenProvider;
@Autowired
private UserService userService;
@Override
public Message < ? > preSend ( Message < ? > message, MessageChannel channel ) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor ( message, StompHeaderAccessor.class );
if ( StompCommand.CONNECT.equals ( accessor.getCommand ( ) ) ) {
Map < String, Object > nativeHeaders = ( Map < String, Object > ) accessor.getHeader ( "nativeHeaders" );
ArrayList authorization = ( ArrayList ) nativeHeaders.get ( "Authorization" );
String beareraalchain = ( String ) authorization.get ( 0 );
String bearername = beareraalchain.split ( " " )[ 0 ];
String token = beareraalchain.split ( " " )[ 1 ];
String username = jwtTokenProvider.getSubject ( token );
if ( !jwtTokenProvider.isTokenValid ( username, token ) )
return null;
accessor.setUser ( new UserPrincipal ( username ) );
log.info ( username + " connected to channel" );
}
return message;
}
}
@服务 公共类NotificationService实现I_TDJCommandExecutionListener {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public NotificationService ( SimpMessagingTemplate messagingTemplate ) {
this.messagingTemplate = messagingTemplate;
}
...
public void sendNotification_TableCreated ( TableDeJeux tableDeJeux ) {
if ( tableDeJeux.getIsPublic ( ) ) {
messagingTemplate.convertAndSend ( "/topic/TableCreated", tableDeJeux );
}
else {
List < InvitedUserInfo > invitedUsersInfos = tableDeJeux.getTableDeJeuxConfiguration ( ).getInvitedUsersInfos ( );
Iterator < InvitedUserInfo > it = invitedUsersInfos.iterator ( );
while ( it.hasNext ( ) ) {
InvitedUserInfo iui = it.next ( );
messagingTemplate.convertAndSendToUser ( iui.getUsername ( ), "/queue/TableCreated", tableDeJeux );
}
}
}
...
}
... 让 sub4 = this.rxStompService .watch('/user/queue/TableCreated') 。订阅 ( { 下一个:(消息:消息)=> { // @ts-忽略 让 tablecreated : TableDeJeuxModel = plainToInstance ( TableDeJeuxModel, JSON.parse ( message.body ) ) this.tablesDeJeux.updateOrAdd(表创建) if (tablecreated.createur?.username===this.currentUser?.username) { this.notificationService.notify (NotificationType.SUCCESS, "太棒了,你已经创建了 table "" + tablecreated.nomTable + "你已经邀请了 évidemment automatiquement !" ) } 别的 { this.notificationService.notify (NotificationType.SUCCESS, "La table "" + tablecreated.nomTable + "" a été créée par "" + tablecreated.createur?.username + "", la table est privée, vous y êtes invité !" ) } }, 完成:() => { console.log(“与/user/topic/TableCreated断开连接”) //sub2.取消订阅() } } );
this.subscriptions.push ( sub4 )
...
终于找到问题所在了。我犯了一个愚蠢的错误。我正在填写 User DestinationPrefix 作为 WebSocket 配置的一部分。但没有为注入的 bean SimpMessaging 模板进行设置。