我有一个listenner方法来处理Spring云流实现的消息,如下所示:
@StreamListener(value = MyInterface.INPUT)
public void handleMsg(@Payload Foo foo) {
// if (concurrentHashMap.containsKey(foo.getId())) concurrentHashMap.remove(foo.getId());
}
这是我的第二种方法,应该被之前的方法阻止:
public Foo getFoo(long fooId) {
// here I need block method with some mechanism until handleMsg remove received object from map and return this foo from there
return fooFromStream;
}
我的目标是从服务类调用getFoo
方法,如下所示:
// some logic
Foo foo = service.getFoo(fooId);
// some logic which required received foo;
我有想法将getFoo
方法中的Foo包装到AsyncResult中,然后调用方法get
关于导致块的Future结果,但我不知道如何将foo从流传递到方法getFoo
用例应该是这样的:
我调用方法getFoo
将foo发送到消息代理并在map中注册foo,然后执行一些逻辑,然后在命令完成时我在StreamListenner中接收消息,从map中删除foo,接下来我需要从方法getFoo
返回foo。
你能告诉我怎么做或解决它的最佳做法是什么?谢谢你的建议。
目前还不完全清楚你要做什么,但是Map<Long, BlockingQueue<Foo>
将允许你阻止take
(或者,poll
超时可能更好),直到听众offer
s Foo
;然后删除地图条目。
请记住,一旦Foo被放入队列,记录将被激活,如果服务器崩溃,它将丢失。
您可以使用并发Map of Long和Blocking队列阻塞队列:
ConcurrentMap<Long, BlockingQueue<Foo>> fooMap = new ConcurrentHashMap<>();
...
private BlockingQueue<Foo> getFooQueue(long fooId) {
return fooMap.computeIfAbsent(fooId, l -> new ArrayBlockingQueue<>(1));
}
...
@StreamListener(value = MyInterface.INPUT)
public void handleMsg(@Payload Foo foo) {
BlockingQueue<Foo> fq = getFooQueue(foo.getId());
synchronized(fq) {
fq.clear();
fq.add(foo);
}
}
...
public Foo getFoo(long fooId) throws InterruptedException {
BlockingQueue<Foo> fq = getFooQueue(fooId);
synchronized(fq) {
return fq.take();
}
}
这两个synchronized
块只有在有可能你的handleMsg
可以被多次调用时才需要,当一个当前可用的foo
应该用新的foo
覆盖。