如何在 Spring Boot 中检测 @Async 方法的结束并运行一些代码;

问题描述 投票:0回答:1

我正在编写一个 Spring Boot 应用程序,我想在后台运行一些多线程计算。队列中一次可能只有几个到几千个计算,所以我不想仅仅依赖于

ThreadPoolTaskExecutor
的队列容量。解决这个问题的最佳方法是什么?

我想过最后递归地调用

calculate
方法,但据我所知这不是上帝的主意(如果我错了,请纠正我)。

这就是我的应用程序现在的样子:

AsyncConfig.java

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setQueueCapacity(100);
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(10);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.initialize();
        return executor;
    }
}

CalculationContext.java

import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;

@Log
@Component
public class CalculationContext {

    private final SettingsContext         settingsContext;
    private final ThreadPoolTaskExecutor  taskExecutor;
    private final CompletionService<Void> completionService;

    private final BlockingQueue<CalculationData> queue = new LinkedBlockingQueue<>();

    public CalculationContext(
            SettingsContext settingsContext,
            @Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor
    ) {
        this.settingsContext   = settingsContext;
        this.taskExecutor      = taskExecutor;
        this.completionService = new ExecutorCompletionService<>(taskExecutor);
    }

    public void addToQueue(CalculationData data) {
        queue.add(data);
        startScraping();
    }

    public void startCalculation() {
        if (queue.isEmpty()) return;
        if (getAvailableThreads() == 0) return;

        do {
            try {
                CalculationData data = queue.take();
                scrapePage(data);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } while (getAvailableThreads() > 0);


    }

    @Async
    public void calculate(CalculationData data) {
        //This is where all logic will be
    }

    private int getAvailableThreads() {
        return taskExecutor.getMaxPoolSize() - taskExecutor.getActiveCount();
    }
}
java spring-boot multithreading
1个回答
0
投票

异步方法大多是即发即忘的。你调用它,但你不知道它什么时候执行,或者什么时候完成。

幸运的是,创建异步方法展示了如何在其

GitHubLookupService 
类中执行此操作 - 返回
CompletableFuture<Void>
而不是
Void

@Async
public void calculate(CalculationData data) {
    //This is where all logic will be
    return CompletableFuture.completedFuture(null);
}

现在您可以定义调用它时下一步要做什么:

calculationContext.calculate(data)
        .thenRun(() -> {
             //This code is called after calculate finishes
        });
© www.soinside.com 2019 - 2024. All rights reserved.