Java 17 Fork join池限制池中线程数

问题描述 投票:0回答:1

在 java 17 中,我们看到与旧的 java 版本(在我的例子中是 java 8)相比,创建了更多的线程(尽管不是同时执行)。虽然线程本身可能不是一个大问题,但在每个线程中都会建立数据库连接,因此我们最终打破了数据库客户端的最大并发会话限制。

public class TestForkJoin {

    public static void main(String[] args) {
        ClassA a = new ClassA();
        System.out.println(a.fetchAll(Collections.singleton("ok")));
    }

    public abstract static class Base {
        public Map<String, String> fetchAll(Set<String> ids) {
            return ids.stream().parallel().map(i -> ImmutablePair.of(i, fetch(i))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        }

        public abstract String fetch(String id);
    }

    public static class ClassA extends Base {

        @Override
        public String fetch(String id) {
            ClassB b = new ClassB();
            Map<String, String> result = b.fetchAll(Collections.singleton(id));
            return String.join(":", result.values());
        }
    }

    public static class ClassB extends Base {

        @Override
        public String fetch(String id) {
            return id;
        }
    }
}

在上面的例子中,我们递归地调用.parallel()。例如

A.fetchAll() -> .parallel()  -> A.fetch() -> B.fetchAll() -> .parallel(). 

深入研究这一点。 .parallel() 使用 forkJoinPool.commonpool() ,其并行性由机器上的核心数或 java.util.concurrent.ForkJoinPool.common.parallelism 控制。

在java 8中,我看到只有8个线程同时执行,池大小也是8。但是在java17中,虽然并行度为8,但池大小高达150。以下是一些选项我评价过

  1. 不使用 fork join 池,而是使用具有固定核心池大小的执行器服务

    但是这个选项对我们不起作用,因为如果 classA.fetchAll 上有更多的并发请求,那么线程将陷入死锁状态。事实上,这是使用 fork join pool 相对于执行器服务的主要优势之一。参见这里

  2. 不要使用 forkjoin 公共池,而是使用自定义 fork join 池,并限制池大小 每当达到池大小限制时,就会抛出异常。所以这实际上并不能解决任何问题。

    java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
    

基本上我想要无限大小的队列,但池中的线程数量不应增加。

这个openjdk问题似乎与此有关。

java java.util.concurrent forkjoinpool
1个回答
0
投票

从下面的文本中我了解到,在 Java 17 中,线程池中的线程比以前使用的 java 版本中的线程更多。由于每个线程单独连接到数据库,因此并行数据库连接过多,应用程序会出现问题。

使用并行流处理 (

.stream().parallel()
),您无法控制有多少线程将处理该问题。相反,您可以使用执行器来更好地控制。

ExecutorService es = Executors.newFixedThreadPool(5); // assuming five threads is your limit
A.fetchAll().stream().forEach(a -> {
   es.submit(new Runnable(){
       B.fetchAll().stream().forEach(b -> {
           es.submit(new Runnable() {
               // do the real work
           });
       });
   });
}

在这种情况下,您肯定知道执行的线程不会超过 5 个 - 即使构建队列的嵌套任务也是从这 5 个线程中运行的。

© www.soinside.com 2019 - 2024. All rights reserved.