我正在使用 STOMP over websockets 和 Spring Boot。是否可以向特定订阅发送消息?我根据 stomp 文档 使用包含 id 字段的 STOMP 标头订阅 STOMP 端点,我希望使用此 id 来确定应该接收消息的客户端,但 spring 似乎不使用此 id。我不能只使用 sendToUser 因为两个客户端可以具有相同的用户 ID,例如如果用户有两个打开的浏览器窗口。只有一个特定窗口应该收到该消息。
在下面的示例中,我有两个连接的客户端,它们使用相同的用户,但 STOMP 标头中的 id 不同。
客户端1-ID:a32d66bf-03c7-47a4-aea0-e464c0727842
客户端2-ID:b3673d33-1bf2-461e-8df3-35b7af07371b
在春季,我执行了以下 Kotlin 代码:
val subscriptions = userRegistry.findSubscriptions {
it.destination == "/user/topic/operations/$operationId/runs"
}
subscriptions.forEach{
println("subscription id: ${it.id}");
println("session id: ${it.session.id}");
println("user id ${it.session.user.name}");
}
输出:
subscription id: sub-7
session id: mcjpgn2i
user id 4a27ef88-25eb-4175-a872-f46e7b9d0564
subscription id: sub-7
session id: 0dxuvjgp
user id 4a27ef88-25eb-4175-a872-f46e7b9d0564
没有任何迹象表明我已传递给 stomp 标头。
是否可以将消息发送到由我随标头传递的 id 确定的特定订阅?
我能够使用
clientOutboundChannel
的 simpleBrokerMessageHandler
(而不是 SimpMessagingTemplate
)发送到特定订阅。注射用
@Autowired
@Qualifier("simpleBrokerMessageHandler")
private AbstractBrokerMessageHandler brokerMessageHandler;
然后,显式地将订阅、会话和目标设置为标头,然后直接在通道上执行
send
命令:
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create();
headerAccessor.setSubscriptionId(desiredSubscriptionId);
headerAccessor.setSessionId(desiredSessionId);
headerAccessor.setDestination(destination);
GenericMessage<byte[]> message = new GenericMessage<>(
payloadAsString.getBytes(StandardCharsets.UTF_8),
headerAccessor.getMessageHeaders());
brokerMessageHandler.getClientOutboundChannel().send(message);
我成功了。
首先,我的客户端设置有问题。我已经在连接标头中设置了订阅 ID,如下所示:
this.stompClient.webSocketFactory = (): WebSocket => new SockJS("/ws");
this.stompClient.connectHeaders = { id: subscriptionId };
this.stompClient.activate();
但是必须在订阅头中设置订阅头:
this.stompClient.subscribe(this.commonEndpoint,
this.onMessageReceived.bind(this),
{ id: subScriptionId });
如果我这样做,spring 会正确地使用这个 id 作为订阅 id,而不是使用像 sub-7 这样的默认值。
根据该线程我可以将消息发送到特定会话而不是用户。
使用以下代码,我可以向特定订阅发送消息:
val subscriptions = userRegistry.findSubscriptions {
it.destination == "/user/topic/operations/$operationId/runs"
}
subscriptions.forEach {
if(it.id === mySubscriptionId){
val headerAccessor =
SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE)
headerAccessor.sessionId = it.session.id
headerAccessor.setLeaveMutable(true)
simpMessagingTemplate.convertAndSendToUser(it.session.id,
"/topic/operations/runs", messageResponseEntity,
headerAccessor.getMessageHeaders())
}
}
我可以使用
SimpMessagingTemplate
向特定订阅发送消息。
@Autowired
private final SimpMessagingTemplate messagingTemplate;
public void sendMessage(String simpUserId, String destination, String message) {
try {
messagingTemplate.convertAndSendToUser(simpUserId, destination, message);
} catch (Exception ex) {
LOG.error("Exception occurred while sending message [message={}]", ex.getMessage(), ex);
}
}