在 java 17 中,我们看到与旧的 java 版本(在我的例子中是 java 8)相比,创建了更多的线程(尽管不是同时执行)。虽然线程本身可能不是一个大问题,但在每个线程中都会建立数据库连接,因此我们最终打破了数据库客户端的最大并发会话限制。
public class TestForkJoin {
public static void main(String[] args) {
ClassA a = new ClassA();
System.out.println(a.fetchAll(Collections.singleton("ok")));
}
public abstract static class Base {
public Map<String, String> fetchAll(Set<String> ids) {
return ids.stream().parallel().map(i -> ImmutablePair.of(i, fetch(i))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
public abstract String fetch(String id);
}
public static class ClassA extends Base {
@Override
public String fetch(String id) {
ClassB b = new ClassB();
Map<String, String> result = b.fetchAll(Collections.singleton(id));
return String.join(":", result.values());
}
}
public static class ClassB extends Base {
@Override
public String fetch(String id) {
return id;
}
}
}
在上面的例子中,我们递归地调用.parallel()。例如
A.fetchAll() -> .parallel() -> A.fetch() -> B.fetchAll() -> .parallel().
深入研究这一点。 .parallel() 使用 forkJoinPool.commonpool() ,其并行性由机器上的核心数或 java.util.concurrent.ForkJoinPool.common.parallelism 控制。
在java 8中,我看到只有8个线程同时执行,池大小也是8。但是在java17中,虽然并行度为8,但池大小高达150。以下是一些选项我评价过
不使用 fork join 池,而是使用具有固定核心池大小的执行器服务
但是这个选项对我们不起作用,因为如果 classA.fetchAll 上有更多的并发请求,那么线程将陷入死锁状态。事实上,这是使用 fork join pool 相对于执行器服务的主要优势之一。参见这里
不要使用 forkjoin 公共池,而是使用自定义 fork join 池,并限制池大小 每当达到池大小限制时,就会抛出异常。所以这实际上并不能解决任何问题。
java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
基本上我想要无限大小的队列,但池中的线程数量不应增加。
这个openjdk问题似乎与此有关。
从下面的文本中我了解到,在 Java 17 中,线程池中的线程比以前使用的 java 版本中的线程更多。由于每个线程单独连接到数据库,因此并行数据库连接过多,应用程序会出现问题。
使用并行流处理 (
.stream().parallel()
),您无法控制有多少线程将处理该问题。相反,您可以使用执行器来更好地控制。
ExecutorService es = Executors.newFixedThreadPool(5); // assuming five threads is your limit
A.fetchAll().stream().forEach(a -> {
es.submit(new Runnable(){
B.fetchAll().stream().forEach(b -> {
es.submit(new Runnable() {
// do the real work
});
});
});
}
在这种情况下,您肯定知道执行的线程不会超过 5 个 - 即使构建队列的嵌套任务也是从这 5 个线程中运行的。