[我使用Java8。我有一个事件处理程序,它以高速率(每秒n个)接受事件,当我收到很多事件时(在此简化示例中为1000),我想将它们刷新到存储中。
[第25行(myCache.get(event.getKey())。add(event.getBean());)是否出现可见性错误?我应该在handleEvent()方法上进行同步吗?
public class myClass extends MySimpleEventHanlder{
private Map<String, List<MyBean>> myCache;
private ScheduledExecutorService scheduler;
public void MyClass(){
myCache = new ConcurrentHashMap<String, List<MyBean>>();
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
for (Iterator<Map.Entry<String, List<MyBean>>> it = myCache.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, List<MyBean>> entry = it.next();
if (entry.getValue().size() >= 1000) {
it.remove();
//do some more processing , flush to storage
}
}
}, 0, 60, TimeUnit.SECONDS);
}
@Override
public void handleEvent(Event event) {
if (myCachetCache.containsKey(event.getKey())) {
myCache.get(event.getKey()).add(event.getBean());
}
else{
List<MyBean> beans = new ArrayList<MyBeans>();
beans.add(event.getBean());
myCache.put(event.key, beans);
}
}
}
您肯定有可见性问题:您在一个线程中将项目添加到ArrayList中,并在另一个线程中从该ArrayList中读取size(),而它们之间没有同步。
[另一个问题是在调用myCache.containsKey
和myCache.get
之间可能会删除密钥。这将导致NullPointerException。这可以通过使用computeIfAbsent来解决,它可以保证是原子的。
myCache.computeIfAbsent(event.getKey(), key -> new ArrayList<>()).add(event.getBean());