如何等待Spring WebSocketStompClient连接

问题描述 投票:3回答:3

我正在使用this guide来实现一个简单的Stomp客户端:

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);

ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

stompClient.setMessageConverter(new StringMessageConverter());

StompSessionHandler sessionHandler = new MySessionHandler();
stompClient.connect("ws://server/endpoint", sessionHandler);

// WAITING HERE

当连接完成时,它应该异步报告给MySessionHandler

public class MySessionHandler extends StompSessionHandlerAdapter
{
     @Override
     public void afterConnected(StompSession session, StompHeaders connectedHeaders) 
     {
         // WAITING FOR THIS
     }
}

所以问题是:WAITING HERE线应该如何等待线WAITING FOR THIS?是否有一种特定的Spring方式来做到这一点?如果没有,哪种通用Java方式最适合这里?

java spring multithreading concurrency spring-websocket
3个回答
3
投票

也许java.util.concurrent.CountDownLatch可以像这样解决你的问题:

CountDownLatch latch = new CountDownLatch(1);
StompSessionHandler sessionHandler = new MySessionHandler(latch);
stompClient.connect("ws://server/endpoint", sessionHandler);
// wait here till latch will be zero
latch.await();

你的MySessionHandler实施:

public class MySessionHandler extends StompSessionHandlerAdapter {
    private final CountDownLatch latch;

    public MySessionHandler(final CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void afterConnected(StompSession session, 
                               StompHeaders connectedHeaders) {
        try {
            // do here some job
        } finally {
            latch.countDown();
        }
    }
}

3
投票

带锁的解决方案有效。后来我发现connect函数返回ListenableFuture<StompSession>,所以我们可以等待会话创建如下:

ListenableFuture<StompSession> future = 
                 stompClient.connect("ws://server/endpoint", sessionHandler);
StompSession session = future.get(); // <--- this line will wait just like afterConnected()

1
投票

你不需要latch,什么都没有,afterConnected方法只有在建立连接时才会被执行。

我的例子:

URI stompUrlEndpoint = new URI("localhost:8080/Serv/enpoint");

ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
//Calls initialize() after the container applied all property values.
taskScheduler.afterPropertiesSet();

StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>(2);

transports.add(new WebSocketTransport(webSocketClient));

SockJsClient sockJsClient = new SockJsClient(transports);   

WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);

stompClient.setMessageConverter(new SimpleMessageConverter());          // default converter: SimpleMessageConverter

// Configure a scheduler to use for heartbeats and for receipt tracking. 
stompClient.setTaskScheduler(taskScheduler);

StompSessionHandlerImp stompSessionHandlerImp = new StompSessionHandlerImp();

ListenableFuture<StompSession> stompSessionFuture2 = stompClient.connect(stompUrlEndpoint.toString(), stompSessionHandlerImp);


try {
            stompSession = stompSessionFuture.get(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TimeoutException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }


       private class StompSessionHandlerImp extends StompSessionHandlerAdapter {
           private StompSession session;


          @Override
          public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
          this.session =  session;

          session.setAutoReceipt(true);
          session.subscribe("/user/queue/bancaria.readcard", new StompFrameHandler() {
            ...
       }
 }

}

© www.soinside.com 2019 - 2024. All rights reserved.