在我的项目中,我同时运行多个查询。这是我目前正在做的事情的例子
在我的存储库中,我正在做这样的事情 “从表名中选择计数(列名)”
columnName 和 tableName 是我从服务传递的变量。假设我有 100 多个列需要计算,那么我必须调用 select 查询 100 多次,这很耗时。是否可以并行运行或至少批量运行?
已经尝试过反应堆堆芯并使用助焊剂,但看起来它仍在按顺序运行。我读过 jdbc 正在阻塞,这就是为什么它需要等待另一个查询完成才能执行另一个查询。但它总是如此还是有其他方式。
为此我正在使用 spring boot 和 jdbc。
编辑: 到目前为止,这是我目前的实施
List<Mono<Count>> countMonoList = new ArrayList<>();
countList.forEach(c -> {
Mono<Count> blockingWrapper = Mono.fromCallable(() -> countRepo.selectQuery(c));
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic());
blockingWrapper.subscribe();
countMonoList.add(blockingWrapper);
});
然后我需要将
List<Mono<Count>>
转换为List<Count>
以进行下一个过程。现在我在运行一些查询后得到数据库连接失败。
这将并行运行到池中的连接数。
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import com.example.demo.count.CountingService;
@SpringBootApplication
public class Springboot3Application {
public static void main(String[] args) {
SpringApplication.run(Springboot3Application.class, args);
}
@Autowired
CountingService countingService;
@EventListener
public void onApplicationEvent(ContextRefreshedEvent event) {
List<String[]> tableInfo = Arrays.asList(new String[][] {new String[] {"countries", "id"}, new String[] {"countries", "name"}});
List<String[]> results = tableInfo.parallelStream().map(entry -> {
int count = countingService.count(entry[0], entry[1]);
return new String[] { entry[0], entry[1], count + "" };
}).collect(Collectors.toList());
}
}
计数服务
public interface CountingService {
int count(String tableName, String tableColumn);
}
CountingServiceImpl
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@Service
public class CountingServiceImpl implements CountingService {
@Autowired
JdbcTemplate jdbcTemplate;
@Override
public int count(String tableName, String columnName) {
return jdbcTemplate.queryForObject(
"select count(" + columnName + ") from " + tableName, Integer.class);
}
}
Reactor 提供了 2 个 API,通过 Mono.fromCallable 和 Mono.fromRunnable
如果多个调用需要并行发生,那么您可以将其卸载到一个 Schedular(可以为基于 IO 的工作选择 bounded elastic)
private static void computeFromCallable() {
for (int i = 0; i<200; i++){
Mono<Integer> integerMono = Mono
.fromCallable(MTSample::compute)
.subscribeOn(Schedulers.boundedElastic());
integerMono
.subscribe((x) -> System.out.println("return value for thread: " + Thread.currentThread().getName() + " is: " + x));
}
}
static Random rand1 = new Random();
private static int compute() throws InterruptedException {
int i = rand1.nextInt(2000);
int j = rand1.nextInt(10);
System.out.println("sleep delay for thread: " + Thread.currentThread().getName() + " is: " + i);
Thread.sleep(i);
return j;
}