Java并行流:如何等待线程完成并行流?

问题描述 投票:0回答:3

所以我有一个列表,从中获得并行流来填写地图,如下所示:

Map<Integer, TreeNode> map = new HashMap<>();
List<NodeData> list = some_filled_list;

//Putting data from the list into the map
list.parallelStream().forEach(d -> {
                TreeNode node = new TreeNode(d);
                map.put(node.getId(), node);
            });

//print out map
map.entrySet().stream().forEach(entry -> {
     System.out.println("Processing node with ID = " + entry.getValue().getId());
                });

这段代码的问题是,当“放入数据”过程仍在进行时,地图正在被打印出来(因为它是并行的),因此,地图尚未收到列表中的所有元素。当然,在我的真实代码中,它不仅仅是打印出地图;而是打印出地图。我使用地图来利用 O(1) 查找时间。

我的问题是:

  1. 如何让主线程等待,以便在地图打印出来之前“放入数据”完成?我尝试将“放入数据”放入线程 t 中,并执行

    t.start()
    t.join()
    ,但这没有帮助。

  2. 也许在这种情况下我不应该使用并行流?清单很长,我只是想利用并行性来提高效率。

java multithreading collections parallel-processing java-stream
3个回答
25
投票

使用此

list.parallelStream().forEach
,您违反了 Stream 文档中明确规定的 side-effects 属性。

此外,当您说此代码是在“放置数据”过程仍在进行时正在打印地图时(因为它是并行的),这是不正确的,因为

forEach
是终端操作,它将等待完成,直到它可以处理下一行。您可能会看到,因为您正在收集到非线程安全的
HashMap
并且某些条目可能不在该映射中...考虑其他方式,如果您放置多个条目会发生什么来自
HashMap
中的多个线程?好吧,很多事情都可能会被破坏,比如缺少条目、地图不正确/不一致等。

当然,将其更改为像

ConcurrentMap
这样的
ConcurrentHashMap
是可行的,因为它是线程安全的。但是你仍然违反了副作用属性,尽管是以“安全”的方式。

正确的做法是直接

collect
Map
而无需
forEach
:

Map<Integer, TreeNode> map = list.parallelStream()
        .collect(Collectors.toMap(
                NodeData::getId,
                TreeNode::new
        ));

这样,即使是并行处理,也一切正常。请注意,您需要lots(数万个元素)才能通过并行处理获得可测量的性能提升。


5
投票

流操作将阻塞,直到并行和非并行实现完成为止。

所以你看到的不是

the "putting data" process is still going on
- 很可能只是数据损坏,因为
HashMap
不是线程安全的。 尝试使用
ConcurrentHashMap
代替。


2
投票

我猜想,如果流仍然可以处理,你可以尝试类似的方法:

    List<NodeData> list = new ArrayList<>();

    //Putting data from the list into the map
    Map<Integer, TreeNode> map = list.parallelStream()
            .collect(Collectors.toMap(
                    n -> n.getId(),
                    n -> new TreeNode(n)
            ));

至少现在您在流上有一个终端。您将尽可能使用多个线程,并且映射肯定会完成。

© www.soinside.com 2019 - 2024. All rights reserved.