我正在编写一个 Spring Boot 应用程序,我想在后台运行一些多线程计算。队列中一次可能只有几个到几千个计算,所以我不想仅仅依赖于
ThreadPoolTaskExecutor
的队列容量。解决这个问题的最佳方法是什么?
我想过最后递归地调用
calculate
方法,但据我所知这不是上帝的主意(如果我错了,请纠正我)。
这就是我的应用程序现在的样子:
AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setQueueCapacity(100);
executor.setCorePoolSize(2);
executor.setMaxPoolSize(10);
executor.setThreadNamePrefix("taskExecutor-");
executor.initialize();
return executor;
}
}
CalculationContext.java
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
@Log
@Component
public class CalculationContext {
private final SettingsContext settingsContext;
private final ThreadPoolTaskExecutor taskExecutor;
private final CompletionService<Void> completionService;
private final BlockingQueue<CalculationData> queue = new LinkedBlockingQueue<>();
public CalculationContext(
SettingsContext settingsContext,
@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor
) {
this.settingsContext = settingsContext;
this.taskExecutor = taskExecutor;
this.completionService = new ExecutorCompletionService<>(taskExecutor);
}
public void addToQueue(CalculationData data) {
queue.add(data);
startScraping();
}
public void startCalculation() {
if (queue.isEmpty()) return;
if (getAvailableThreads() == 0) return;
do {
try {
CalculationData data = queue.take();
scrapePage(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (getAvailableThreads() > 0);
}
@Async
public void calculate(CalculationData data) {
//This is where all logic will be
}
private int getAvailableThreads() {
return taskExecutor.getMaxPoolSize() - taskExecutor.getActiveCount();
}
}
异步方法大多是即发即忘的。你调用它,但你不知道它什么时候执行,或者什么时候完成。
幸运的是,创建异步方法展示了如何在其
GitHubLookupService
类中执行此操作 - 返回 CompletableFuture<Void>
而不是 Void
:
@Async
public void calculate(CalculationData data) {
//This is where all logic will be
return CompletableFuture.completedFuture(null);
}
现在您可以定义调用它时下一步要做什么:
calculationContext.calculate(data)
.thenRun(() -> {
//This code is called after calculate finishes
});