我有一个不对称拆分的递归任务。它不是分成两半,而是咬住一部分它可以执行的工作,然后分叉其余的工作。这种情况并没有像预期的那样并行。
考虑以下代码以理解我的意思:
public class Sandbox {
public static void main(String[] args) {
var pool = new ForkJoinPool(5);
var task = new Task(20, 1);
var start = Instant.now();
pool.invoke(task);
System.out.println("Elapsed Time: " + Duration.between(start, Instant.now()).toSeconds());
}
static class Task extends RecursiveAction {
int work;
int taskNum;
public Task(int work, int taskNum) {
this.work = work;
this.taskNum = taskNum;
}
protected void compute() {
System.out.println(Thread.currentThread().getName() + " compute enter: " + this);
if (work > 1) {
// bite ONE from work counter
var w = new Task(1, taskNum);
// split the remaining work and fork
var s = new Task(work - 1, taskNum + 1);
s.fork();
w.compute();
s.join();
} else {
doWork();
}
System.out.println(Thread.currentThread().getName() + " compute exit: " + this);
}
void doWork() {
try {
System.out.println(Thread.currentThread().getName() + " working: " + this);
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public String toString() {
return "Task{" + "work=" + work + ", taskNum=" + taskNum + '}';
}
}
}
有一个工作计数器,它定义了要做的工作量。每个任务都从那个计数器中取出一个并分叉剩余的工作,然后加入剩余的工作直到完成。
我希望这样的任务将与 ForkJoinPool 构造函数中定义的并行度并行(并行度 = 5)。但它是并行启动的,然后似乎只有一个线程在工作,其余线程只是在等待 join() 调用。
请帮助我理解为什么任务直到最后都没有并行。当我在执行计算之前调用 fork() 时,工作队列中总是有一个任务应该被其他线程窃取并进一步拆分。
我用
invokeAll
试过你的例子,结果似乎更平衡(也快了大约 50%)。我会猜测正在发生的事情是被调用的原始线程,领先并基本上在池中的任何其他线程获得机会之前排队所有Task
s,所以它以所有w
结束
调用堆栈上的任务。所以它最终自己完成了 50% 的工作。
class Sandbox {
public static void main( String[] args ) {
var pool = new ForkJoinPool( 5 );
var task = new Task( 20, 1 );
long start = System.nanoTime();
pool.invoke( task );
System.out.println( "time: " + (System.nanoTime() - start) * 1e-9 * 1e3 + "ms" );
}
static class Task extends RecursiveAction {
int work;
int taskNum;
public Task( int work, int taskNum ) {
this.work = work;
this.taskNum = taskNum;
}
protected void compute() {
System.out.println( Thread.currentThread().getName() + " compute enter: " + this );
if( work > 1 ) {
// bite ONE from work counter
var w = new Task( 1, taskNum );
// split the remaining work and fork
var s = new Task( work - 1, taskNum + 1 );
invokeAll( w, s );
// s.fork();
// w.compute();
// s.join();
} else {
doWork();
}
System.out.println( Thread.currentThread().getName() + " compute exit: " + this );
}
void doWork() {
try {
System.out.println( Thread.currentThread().getName() + " working: " + this );
Thread.sleep( 1000 );
} catch( InterruptedException e ) {
Thread.currentThread().interrupt();
}
}
public String toString() {
return "Task{" + "work=" + work + ", taskNum=" + taskNum + '}';
}
}
}