Kotlin 协程可以取消协程:
suspend fun forgettingTheBirthDayRoutine() {
coroutineScope {
val workingJob = launch {
workingConsciousness()
}
launch {
delay(2000L)
workingJob.cancel()
workingJob.join()
logger.info("I forgot the birthday! Let's go to the mall!")
}
}
}
调用
workingConsciousness
后,workingJob.cancel()
第一次到达挂起点时,它将被取消,其子协程也将被取消。
是否可以在 Java 中使用结构化并发来实现相同的行为(
StructuredTaskScope
及其子类)?
我所知道的在 Java 中取消任务的唯一方法是为此线程调用
thread.interrupt
。这正是 java.util.concurrent.FutureTask
实现其 cancel
方法的方式 - 它显然试图中断执行其 Runnable
/Callable
的线程。
因此,要尝试取消
StructuredTaskScope.Subtask
,我们只需要中断它执行(或执行)的线程即可。然而,正如OP正确所说,StructuredTaskScope
用户无法访问这样的线程。
为了使该线程可访问,理想的解决方案是实现(可能通过委托)
StructuredTaskScope.Subtask
接口,存储线程引用并使用cancel
方法扩展它。然而,这是不可能做到的,因为这个界面是sealed
。
为了规避这个障碍,并假设我们可以控制
Callable
参数,传递给 StructuredTaskScope.fork
方法,我们可以有以下委托:
class CancellableCallable<V> implements Callable<V> {
private final Callable<V> delegatee;
private Thread thread;
public CancellableCallable(Callable<V> delegatee) {
this.delegatee = delegatee;
}
@Override
public V call() throws Exception {
thread = Thread.currentThread();
return delegatee.call();
}
public Thread getThread() {
return thread;
}
}
然后,如果传递给
Callable
方法的 fork
的确切类型是 CancellableCallable
,则可以取消这样的 SubTask
:
((CancellableCallable<?>) subtask.task()).getThread().interrupt();
为了方便起见,我们可以有一个
StructuredTaskScope
的子类(这是允许的,甚至是建议的),将上面的内容包装在 cancel
方法中:
class CancellableTaskScope<T> extends StructuredTaskScope<T> {
public <U extends T> void cancel(Subtask<U> task) {
if (task.state() != Subtask.State.UNAVAILABLE)
throw new IllegalStateException("Task is not running");
final Callable<? extends U> callable = task.task();
if (callable instanceof CancellableCallable) {
((CancellableCallable<?>) callable).getThread().interrupt();
} else {
throw new IllegalArgumentException("Task is not constructed with CancellableCallable");
}
}
}
很容易看出,上面的代码没有像FutureTask
那样提供
atomic取消,因此可能会尝试中断已经终止的线程,这可能完全没问题,正如
Thread.interrupt
的JavaDoc所述:不存在的线程不需要有任何效果。
上面的用法可能如下所示:
try (CancellableTaskScope<String> scope = new CancellableTaskScope<>()) {
Subtask<String> subtask1 = scope.fork(new CancellableCallable<>(() -> {
try {
Thread.sleep(SOME_INTERVAL);
} catch (InterruptedException e) {
}
return "OK";
}));
Subtask<Integer> subtask2 = scope.fork(...);
scope.cancel(subtask1);
scope.join();
}
请注意,像往常一样,线程中断上的 reaction 的负担完全取决于任务本身,
Callable
实现。已知本机方法会忽略它。这就是为什么上面故意使用“尝试”这个词。在这方面,FutureTask
,当然没有什么不同。