Spring集成和Reactive WebSockets

问题描述 投票:1回答:2

Spring集成提供非反应性入站/出站WebSocket适配器,简单地说,通过内部容器将会话与id关联,对消息进行一些处理,在出站时,它检查会话ID的消息头,并通过该会话发送。

现在,Spring通过org.springframework.web.reactive.socket.WebSocketSession和其他类提供反应式WebSocket支持,我想知道对于反应式WebSocket堆栈的通道适配器有类似的支持。

如果没有,是否有任何常见的模式/实践,如何将反应性WS与弹簧整合消息流集成?

java spring websocket spring-integration spring-websocket
2个回答
1
投票

这个功能还没有被调用,所以我们没有考虑过这个问题。

请看看我的SandBox。这是我能够根据当前的情况建议的最好的。

我们只需遵循标准的Spring WebFlux建议即可实现WebSockets解决方案。所以,我们有一个WebSocketHandler实现与适当的URL映射。实施只是将Flux中的session.receive()转发到动态注册的IntegrationFlow。然后流量转换为用于Publisher的Reactive session.send()

我相信可以使用许多其他方法,例如使用来自这个FluxMessageChannel impl的subscribeTo() bean及其handle(WebSocketSession)Flux桥接到预定义的Integration流程中。或者简单的@MessagingGateway来自doOnNext()

但是,不确定session.send()是否可以独立使用下游(需要播放),但是您可以在示例中看到我如何将WebSocketSession传播到MessageHeaders以便在Integration流程中进行访问。


0
投票

感谢您的见解,Artem,他们帮了很多忙。

这是我最终做的事情:

从处理程序,我只是将收到的消息发送到我的频道接受它们:

public class FakeWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        session.receive()
            .map(wsm -> MessageBuilder.withPayload(wsm).build())
            .subscribe(subscriberChannel::send);

        return Mono.never();
    }
}

下游,在我的流程结束时,在我的最终服务激活器中,我发送响应:

public class FakeResponder extends AbstractMessageHandler {
    @Override
    protected void handleMessageInternal(Message<?> message) {
        final WebSocketSession session = ...; // obtain WebSocketSession for this message

        session.send(Mono.just(message.getPayload())
            .map(this::convertToSomeByteRepresentation)
            .map(bb -> session.binaryMessage(dbf -> dbf.wrap(bb)))
        ).subscribe();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.