CompletableFuture 未按预期工作并且具有更多数量的额外线程

问题描述 投票:0回答:1

我正在尝试并行化以下操作,即在每次迭代中通过分页获取具有 50000 条记录的所有记录,并将它们写入 s3 文件上的 CSV 文件中。

并行之前:

public void generateStudentFiles(){

  int pageCount =0;
  boolean isEndOfFile=false;
  while(isEndOfFile){
    // step1: fetch the records from the db with pagination
    List<StudentEntity> result = repo.getStudentDataWithPagination(50000,pageCount);
    if(CollectionUtils.isEmpty(result) || result.size() < 50000){
         isEndOfFile = true;
    } else {
      // step2: void method to write records into csv file on s3 location
      writeResultToS3(result);
    }
    pageCount++;

   }

}

并行后:

我想同时写入最多 5 个线程的文件。

public void generateStudentFiles(){
     
      // created 5 threads for concurrency or parallelis 
      ForkJoinPool customPool = new ForkJoinPool(5);    
      CompletableFuture<Void> future = new CompletableFuture<>();

      int pageCount =0;
      boolean isEndOfFile=false;
      while(isEndOfFile){

      CompletableFuture.runAsync(() -> {
          
        // step1: fetch the records from the db with pagination
        List<StudentEntity> result = repo.getStudentDataWithPagination(50000,pageCount);
        if(CollectionUtils.isEmpty(result) || result.size() < 50000){
             isEndOfFile = true;
        } else {
          // step2: void method to write records into csv file on s3 location
          writeResultToS3(result);
        }
        pageCount++;          

      }, customPool);
        
    
       }

future.complete(null);
    
}

上述并行代码的预期输出:

从数据库获取记录并写入 s3 位置的整个操作应在 5 个线程上并行运行,并减少写入文件所需的时间

上述并行代码的实际输出:

有超过 5 个线程(不知道为什么有额外的线程)随机运行并在 s3 位置生成许多空的 CSV 文件。例如,看到的不是 50 个文件,而是 200 个文件,其中 150 个文件是空白的,50 个文件有数据,并且线程执行永远不会结束。关于如何解决这个问题有什么建议吗?

java spring-boot java-8 concurrency completable-future
1个回答
0
投票

我在您的代码中发现了一些问题,但是,我已经更正并测试了代码。现在正在并行工作。

公共静态无效generateStudentFiles(){

    // created 3 threads for concurrency or parallelis
    ExecutorService customPool = Executors.newFixedThreadPool(3);

    int pageCount =0;
    boolean isEndOfFile=true;
    while(isEndOfFile){

        List< StudentEntity> result = getStudentDataWithPagination(50000,pageCount);
        result.stream().map(str -> CompletableFuture.runAsync(()->writeResultToS3(str),customPool)).collect(Collectors.toList());
        pageCount = pageCount++ // you need to check this logic for updating PageCount;

        if(pageCount==50000) // your need to check this logic for breaking the loop 
            isEndOfFile=false;
    }
}  

请尝试这个。它应该对你有用。

  • 这里,writeResultToS3() 仅与创建的 3 个线程并行工作。如有任何疑问,请随时联系
© www.soinside.com 2019 - 2024. All rights reserved.