是否可以取消 StructuredTaskScope 中的分叉子任务

问题描述 投票:0回答:1

Kotlin 协程可以取消协程:

suspend fun forgettingTheBirthDayRoutine() {
  coroutineScope {
    val workingJob = launch {
      workingConsciousness()
    }
    launch {
      delay(2000L)
      workingJob.cancel()
      workingJob.join()
      logger.info("I forgot the birthday! Let's go to the mall!")
    }
  }
}

调用

workingConsciousness
后,
workingJob.cancel()
第一次到达挂起点时,它将被取消,其子协程也将被取消。

是否可以在 Java 中使用结构化并发来实现相同的行为(

StructuredTaskScope
及其子类)?

java concurrency java-21 virtual-threads structured-concurrency
1个回答
0
投票

我所知道的在 Java 中取消任务的唯一方法是为此线程调用

thread.interrupt
。这正是
java.util.concurrent.FutureTask
实现其
cancel
方法的方式 - 它显然试图中断执行其
Runnable
/
Callable
的线程。

因此,要尝试取消

StructuredTaskScope.Subtask
,我们只需要中断它执行(或执行)的线程即可。然而,正如OP正确所说,
StructuredTaskScope
用户无法访问这样的线程。

为了使该线程可访问,理想的解决方案是实现(可能通过委托)

StructuredTaskScope.Subtask
接口,存储线程引用并使用
cancel
方法扩展它。然而,这是不可能做到的,因为这个界面是
sealed

为了规避这个障碍,并假设我们可以控制

Callable
参数,传递给
StructuredTaskScope.fork
方法,我们可以有以下委托:

class CancellableCallable<V> implements Callable<V> {

    private final Callable<V> delegatee;
    private Thread thread;

    public CancellableCallable(Callable<V> delegatee) {
        this.delegatee = delegatee;
    }

    @Override
    public V call() throws Exception {
        thread = Thread.currentThread();
        return delegatee.call();
    }

    public Thread getThread() {
        return thread;
    }

}

然后,如果传递给

Callable
方法的
fork
的确切类型是
CancellableCallable
,则可以取消这样的
SubTask
:

((CancellableCallable<?>) subtask.task()).getThread().interrupt();

为了方便起见,我们可以有一个

StructuredTaskScope
的子类(这是允许的,甚至是建议的),将上面的内容包装在
cancel
方法中:

class CancellableTaskScope<T> extends StructuredTaskScope<T> {

    public <U extends T> void cancel(Subtask<U> task) {
        if (task.state() != Subtask.State.UNAVAILABLE) 
           throw new IllegalStateException("Task is not running");
        final Callable<? extends U> callable = task.task();
        if (callable instanceof CancellableCallable) {
            ((CancellableCallable<?>) callable).getThread().interrupt();
        } else {
            throw new IllegalArgumentException("Task is not constructed with CancellableCallable");
        }
    }

}

很容易看出,上面的代码没有像FutureTask那样提供

atomic
取消,因此可能会尝试中断已经终止的线程,这可能完全没问题,正如
Thread.interrupt
的JavaDoc所述:不存在的线程不需要有任何效果。

上面的用法可能如下所示:

try (CancellableTaskScope<String> scope = new CancellableTaskScope<>()) {
    Subtask<String> subtask1 = scope.fork(new CancellableCallable<>(() -> {
        try {
            Thread.sleep(SOME_INTERVAL);
        } catch (InterruptedException e) {
        }
        return "OK";
    })); 
    Subtask<Integer> subtask2 = scope.fork(...);            
    scope.cancel(subtask1);
    scope.join(); 
} 

请注意,像往常一样,线程中断上的 reaction 的负担完全取决于任务本身,

Callable
实现。已知本机方法会忽略它。这就是为什么上面故意使用“尝试”这个词。在这方面,FutureTask,当然没有什么不同。
    

© www.soinside.com 2019 - 2024. All rights reserved.