ForkJoinPool具有不同的延迟,具有不同风格的相同代码

问题描述 投票:0回答:1

我试图将paralleStream与自定义ForkJoin池一起使用,该任务执行网络调用。当我使用以下样式

pool.submit(() -> {
        ioDelays.parallelStream().forEach(n -> {
            induceRandomSleep(n);
        });
    }).get();

如果我循环并逐个提交任务,所以花费的时间几乎是11次,如下所示:

for (final Integer num : ioDelays) {
        ForkJoinTask<Integer> task =  pool.submit(() -> {
            return induceRandomSleep(num);
        });
        tasks.add(task);
    }
    int count = 0;
    final List<Integer> returnVals = new ArrayList<>();
    tasks.forEach(task -> {
        try {
            returnVals.add(task.get());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    });

如果使用parallelStream,ForkJoinPool.common是否涉及?这是整个程序来模拟上述两种样式

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

public class FJTPExperiment {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool pool = new ForkJoinPool(200);

        List<Integer> ioDelays = new ArrayList<>();
        for (int i = 0; i <2000; i++) {
            ioDelays.add( (int)(300 *Math.random() + 200));
        }
        int originalCount = 0;
        for (Integer val : ioDelays) {
            originalCount += val;
        }
        System.out.println("Expected " + originalCount);
        System.out.println(Thread.currentThread().getName() + " ::::Number of threads in common pool :" + ForkJoinPool.getCommonPoolParallelism());


        long beginTimestamp = System.currentTimeMillis();
        pool.submit(() -> {
            ioDelays.parallelStream().forEach(n -> {
                induceRandomSleep(n);
            });
        }).get();
        long endTimestamp = System.currentTimeMillis();
        System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");


        List<ForkJoinTask<Integer>> tasks = new ArrayList<>();
        beginTimestamp = System.currentTimeMillis();
        for (final Integer num : ioDelays) {
            ForkJoinTask<Integer> task =  pool.submit(() -> {
                return induceRandomSleep(num);
            });
            tasks.add(task);
        }
        int count = 0;
        final List<Integer> returnVals = new ArrayList<>();
        tasks.forEach(task -> {
            try {
                returnVals.add(task.get());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        });
        endTimestamp = System.currentTimeMillis();
        for (Integer val : returnVals) {
            count += val;
        }
        System.out.println("Count " + count);
        System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");
    }


    public static int induceRandomSleep(int sleepInterval) {
        System.out.println(Thread.currentThread().getName() + " ::::sleeping for " + sleepInterval + " ms");
        try {
            Thread.sleep(sleepInterval);
            return sleepInterval;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return sleepInterval;
        }
    }
}
java multithreading threadpool
1个回答
0
投票

我最终找到了答案,问题分为两部分:

1)只有一个任务被提交给ForkJoinPool它如何产生多个线程?

查看JDK implementation时,似乎调用了parallelStream时,它会检查当前线程是否为ForkJoinWorkerThread,如果是,则将任务推送到客户ForkJoinPool的队列,否则将其推送到ForkJoinPool.common。这也通过日志验证。

2)如果它的工作原因为什么它很慢?

它很慢,因为并行性不是源于自定义ForkJoinPool的并行性,而是源自ForkJoinPool.common的并行性,默认情况下仅限于Number of CPU cores -1。 JDK实现是hereLEAF_TARGET派生here。如果这必须正常工作那么应该有一个分支从自定义线程池的并行性派生LEAF_TARGET

© www.soinside.com 2019 - 2024. All rights reserved.