所以我有一个列表,从中获得并行流来填写地图,如下所示:
Map<Integer, TreeNode> map = new HashMap<>();
List<NodeData> list = some_filled_list;
//Putting data from the list into the map
list.parallelStream().forEach(d -> {
TreeNode node = new TreeNode(d);
map.put(node.getId(), node);
});
//print out map
map.entrySet().stream().forEach(entry -> {
System.out.println("Processing node with ID = " + entry.getValue().getId());
});
这段代码的问题是,当“放入数据”过程仍在进行时,地图正在被打印出来(因为它是并行的),因此,地图尚未收到列表中的所有元素。当然,在我的真实代码中,它不仅仅是打印出地图;而是打印出地图。我使用地图来利用 O(1) 查找时间。
我的问题是:
如何让主线程等待,以便在地图打印出来之前“放入数据”完成?我尝试将“放入数据”放入线程 t 中,并执行
t.start()
和 t.join()
,但这没有帮助。也许在这种情况下我不应该使用并行流?清单很长,我只是想利用并行性来提高效率。
使用此
list.parallelStream().forEach
,您违反了 Stream 文档中明确规定的 side-effects 属性。
此外,当您说此代码是在“放置数据”过程仍在进行时正在打印地图时(因为它是并行的),这是不正确的,因为
forEach
是终端操作,它将等待完成,直到它可以处理下一行。您可能会看到,因为您正在收集到非线程安全的HashMap
并且某些条目可能不在该映射中...考虑其他方式,如果您放置多个条目会发生什么来自 HashMap
中的多个线程?好吧,很多事情都可能会被破坏,比如缺少条目、地图不正确/不一致等。
ConcurrentMap
这样的 ConcurrentHashMap
是可行的,因为它是线程安全的。但是你仍然违反了副作用属性,尽管是以“安全”的方式。
正确的做法是直接
collect
到 Map
而无需 forEach
:
Map<Integer, TreeNode> map = list.parallelStream()
.collect(Collectors.toMap(
NodeData::getId,
TreeNode::new
));
这样,即使是并行处理,也一切正常。请注意,您需要lots(数万个元素)才能通过并行处理获得可测量的性能提升。
流操作将阻塞,直到并行和非并行实现完成为止。
所以你看到的不是
the "putting data" process is still going on
- 很可能只是数据损坏,因为 HashMap
不是线程安全的。
尝试使用 ConcurrentHashMap
代替。
我猜想,如果流仍然可以处理,你可以尝试类似的方法:
List<NodeData> list = new ArrayList<>();
//Putting data from the list into the map
Map<Integer, TreeNode> map = list.parallelStream()
.collect(Collectors.toMap(
n -> n.getId(),
n -> new TreeNode(n)
));
至少现在您在流上有一个终端。您将尽可能使用多个线程,并且映射肯定会完成。