为什么调用 ForkJoinTask.join() 而不是阻塞线程而不是像预期的那样从工作窃取机制中窃取工作

问题描述 投票:0回答:1

我有一个不对称拆分的递归任务。它不是分成两半,而是咬住一部分它可以执行的工作,然后分叉其余的工作。这种情况并没有像预期的那样并行。

考虑以下代码以理解我的意思:

public class Sandbox {

    public static void main(String[] args) {
        var pool = new ForkJoinPool(5);
        var task = new Task(20, 1);
        var start = Instant.now();
        pool.invoke(task);
        System.out.println("Elapsed Time: " + Duration.between(start, Instant.now()).toSeconds());
    }

    static class Task extends RecursiveAction {

        int work; 
        int taskNum;

        public Task(int work, int taskNum) {
            this.work = work;
            this.taskNum = taskNum;
        }

        protected void compute() {
            System.out.println(Thread.currentThread().getName() + " compute enter: " + this);
            if (work > 1) {
                // bite ONE from work counter
                var w = new Task(1, taskNum);
                // split the remaining work and fork
                var s = new Task(work - 1, taskNum + 1);
                s.fork();
                w.compute();
                s.join();
            } else {
                doWork();
            }
            System.out.println(Thread.currentThread().getName() + " compute exit: " + this);
        }

        void doWork() {
            try {
                System.out.println(Thread.currentThread().getName() + " working: " + this);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public String toString() {
            return "Task{" + "work=" + work + ", taskNum=" + taskNum + '}';
        }
    }
}

有一个工作计数器,它定义了要做的工作量。每个任务都从那个计数器中取出一个并分叉剩余的工作,然后加入剩余的工作直到完成。

我希望这样的任务将与 ForkJoinPool 构造函数中定义的并行度并行(并行度 = 5)。但它是并行启动的,然后似乎只有一个线程在工作,其余线程只是在等待 join() 调用。

请帮助我理解为什么任务直到最后都没有并行。当我在执行计算之前调用 fork() 时,工作队列中总是有一个任务应该被其他线程窃取并进一步拆分。

java multithreading concurrency java.util.concurrent forkjoinpool
1个回答
0
投票

我用

invokeAll
试过你的例子,结果似乎更平衡(也快了大约 50%)。我会猜测正在发生的事情是被调用的原始线程,领先并基本上在池中的任何其他线程获得机会之前排队所有
Task
s,所以它以所有
w结束
调用堆栈上的任务。所以它最终自己完成了 50% 的工作。

class Sandbox {

   public static void main( String[] args ) {
      var pool = new ForkJoinPool( 5 );
      var task = new Task( 20, 1 );
      long start = System.nanoTime();
      pool.invoke( task );
      System.out.println( "time: " + (System.nanoTime() - start) * 1e-9 * 1e3 + "ms" );
   }

   static class Task extends RecursiveAction {

      int work;
      int taskNum;

      public Task( int work, int taskNum ) {
         this.work = work;
         this.taskNum = taskNum;
      }

      protected void compute() {
         System.out.println( Thread.currentThread().getName() + " compute enter: " + this );
         if( work > 1 ) {
            // bite ONE from work counter
            var w = new Task( 1, taskNum );
            // split the remaining work and fork
            var s = new Task( work - 1, taskNum + 1 );
            invokeAll( w, s );
//            s.fork();
//            w.compute();
//            s.join();
         } else {
            doWork();
         }
         System.out.println( Thread.currentThread().getName() + " compute exit: " + this );
      }

      void doWork() {
         try {
            System.out.println( Thread.currentThread().getName() + " working: " + this );
            Thread.sleep( 1000 );
         } catch( InterruptedException e ) {
            Thread.currentThread().interrupt();
         }
      }

      public String toString() {
         return "Task{" + "work=" + work + ", taskNum=" + taskNum + '}';
      }
   }
}
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.