我正在使用this guide来实现一个简单的Stomp客户端:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
stompClient.setMessageConverter(new StringMessageConverter());
StompSessionHandler sessionHandler = new MySessionHandler();
stompClient.connect("ws://server/endpoint", sessionHandler);
// WAITING HERE
当连接完成时,它应该异步报告给MySessionHandler
:
public class MySessionHandler extends StompSessionHandlerAdapter
{
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders)
{
// WAITING FOR THIS
}
}
所以问题是:WAITING HERE
线应该如何等待线WAITING FOR THIS
?是否有一种特定的Spring方式来做到这一点?如果没有,哪种通用Java方式最适合这里?
也许java.util.concurrent.CountDownLatch
可以像这样解决你的问题:
CountDownLatch latch = new CountDownLatch(1);
StompSessionHandler sessionHandler = new MySessionHandler(latch);
stompClient.connect("ws://server/endpoint", sessionHandler);
// wait here till latch will be zero
latch.await();
你的MySessionHandler
实施:
public class MySessionHandler extends StompSessionHandlerAdapter {
private final CountDownLatch latch;
public MySessionHandler(final CountDownLatch latch) {
this.latch = latch;
}
@Override
public void afterConnected(StompSession session,
StompHeaders connectedHeaders) {
try {
// do here some job
} finally {
latch.countDown();
}
}
}
带锁的解决方案有效。后来我发现connect
函数返回ListenableFuture<StompSession>
,所以我们可以等待会话创建如下:
ListenableFuture<StompSession> future =
stompClient.connect("ws://server/endpoint", sessionHandler);
StompSession session = future.get(); // <--- this line will wait just like afterConnected()
你不需要latch,什么都没有,afterConnected方法只有在建立连接时才会被执行。
我的例子:
URI stompUrlEndpoint = new URI("localhost:8080/Serv/enpoint");
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
//Calls initialize() after the container applied all property values.
taskScheduler.afterPropertiesSet();
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(webSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new SimpleMessageConverter()); // default converter: SimpleMessageConverter
// Configure a scheduler to use for heartbeats and for receipt tracking.
stompClient.setTaskScheduler(taskScheduler);
StompSessionHandlerImp stompSessionHandlerImp = new StompSessionHandlerImp();
ListenableFuture<StompSession> stompSessionFuture2 = stompClient.connect(stompUrlEndpoint.toString(), stompSessionHandlerImp);
try {
stompSession = stompSessionFuture.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
private class StompSessionHandlerImp extends StompSessionHandlerAdapter {
private StompSession session;
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
this.session = session;
session.setAutoReceipt(true);
session.subscribe("/user/queue/bancaria.readcard", new StompFrameHandler() {
...
}
}
}