Spring集成提供非反应性入站/出站WebSocket适配器,简单地说,通过内部容器将会话与id关联,对消息进行一些处理,在出站时,它检查会话ID的消息头,并通过该会话发送。
现在,Spring通过org.springframework.web.reactive.socket.WebSocketSession和其他类提供反应式WebSocket支持,我想知道对于反应式WebSocket堆栈的通道适配器有类似的支持。
如果没有,是否有任何常见的模式/实践,如何将反应性WS与弹簧整合消息流集成?
这个功能还没有被调用,所以我们没有考虑过这个问题。
请看看我的SandBox。这是我能够根据当前的情况建议的最好的。
我们只需遵循标准的Spring WebFlux建议即可实现WebSockets解决方案。所以,我们有一个WebSocketHandler
实现与适当的URL映射。实施只是将Flux
中的session.receive()
转发到动态注册的IntegrationFlow
。然后流量转换为用于Publisher
的Reactive session.send()
。
我相信可以使用许多其他方法,例如使用来自这个FluxMessageChannel
impl的subscribeTo()
bean及其handle(WebSocketSession)
将Flux
桥接到预定义的Integration流程中。或者简单的@MessagingGateway
来自doOnNext()
。
但是,不确定session.send()
是否可以独立使用下游(需要播放),但是您可以在示例中看到我如何将WebSocketSession
传播到MessageHeaders
以便在Integration流程中进行访问。
感谢您的见解,Artem,他们帮了很多忙。
这是我最终做的事情:
从处理程序,我只是将收到的消息发送到我的频道接受它们:
public class FakeWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
session.receive()
.map(wsm -> MessageBuilder.withPayload(wsm).build())
.subscribe(subscriberChannel::send);
return Mono.never();
}
}
下游,在我的流程结束时,在我的最终服务激活器中,我发送响应:
public class FakeResponder extends AbstractMessageHandler {
@Override
protected void handleMessageInternal(Message<?> message) {
final WebSocketSession session = ...; // obtain WebSocketSession for this message
session.send(Mono.just(message.getPayload())
.map(this::convertToSomeByteRepresentation)
.map(bb -> session.binaryMessage(dbf -> dbf.wrap(bb)))
).subscribe();
}
}