从数据库获取数据后动态设置块大小

问题描述 投票:0回答:1

我需要在存储在数据库中的 Spring Batch 作业步骤中动态设置块大小,即需要从数据库中获取块大小并将其设置到 bean 中。

我的查询是这样的:

select CHUNK_SIZE from SOME_TABLE_NAME where ID='some_id_param_value'

这里

ID
的值将来自作业参数,该参数是通过随请求传递到
Rest Controller
的请求参数设置的(同时触发批处理作业)

我想从数据库中获取这个

CHUNK_SIZE
并将其动态设置到作业的步骤中。 我们的要求是块大小根据 ID 值而变化,其详细信息存储在数据库表中。例如:

身份证 CHUNK_SIZE
01 1000
02 2500

我知道作业中的bean是在配置时设置的,作业参数是在运行时触发作业时传递的。

编辑:

MahmoudBenHassine 提供的示例使用

@JobScope
并使用
@Value("#{jobParameters['id']}")
访问步骤 bean 中的 jobParameters。我尝试使用
jobExecutionContext
实现类似的方法,如下所示:

  1. 从 db 表中获取 chunkSize StepExecutionListener 的

    beforeStep
    方法并在
    ExecutionContext

  2. @JobScope
    注释步骤 bean 并使用
    @Value("#{jobExecutionContext['chunk']}")
    在步骤中访问它 豆子。

  3. 但是我面临以下错误:

    Error creating bean with name 'scopedTarget.step' defined in class path resource [com/sample/config/SampleBatchConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'step' threw exception; nested exception is java.lang.NullPointerException

它无法从

jobExecutionContext
访问“块”键值,从而抛出
NullPointerException
。 是否需要以某种方式提升它以便可以在步骤 bean 中访问它?如果是的话,我们将不胜感激。

我的控制器类:

@RestController
public class SampleController {

    @Autowired
    JobLauncher sampleJobLauncher;

    @Autowired
    Job sampleJob;
    
    @GetMapping("/launch")
    public BatchStatus launch(@RequestParam(name = "id", required = true) String id){

        Map<String, JobParameter> map = new HashMap<>();
        map.put("id",  new JobParameter(id));
        map.put("timestamp",  new JobParameter(System.currentTimeMillis));

        JobParameters params = new JobParameters(map);
        JobExecution j = sampleJobLauncher.run(sampleJob, params);

        return j.getStatus();
    }
}   

我的批处理配置类(包含作业和步骤bean):

@Configuration
public class SampleBatchConfig{

    @Autowired
    private JobBuilderFactory myJobBuilderFactory;

    @Autowired
    private StepBuilderFactory myStepBuilderFactory;

    @Autowired
    private MyRepoClass myRepo; // this class contains the jdbc method to fetch chunksize from the db table
    
    @Autowired
    MyReader myReader;
    
    @Autowired
    MyWriter myWriter;
    
    @Bean
    @JobScope
    public Step sampleStep(@Value("#{jobExecutionContext['chunk']}") Integer chunkSize){
        return myStepBuilderFactory.get("sampleStep")
                .<MyClass, MyClass>chunk(chunkSize) //TODO ~instead of hardcoding the chunkSize or getting it from the properties file using @Value, the requirement is to fetch it from the db table using the above mentioned query with id job parameter and set it here
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())
                .listener(new StepExecutionListener() {
                    @Override
                    public void beforeStep(StepExecution stepExecution) {
                        int chunk = myRepo.findChunkSize(stepExecution.getJobExecution().getExecutionContext().get("id")); // this method call fetches chunksize from the db table using the id job parameter
                        stepExecution.getJobExecution().getExecutionContext().put("chunk", chunk);
                    }

                    @Override
                    public ExitStatus afterStep(StepExecution stepExecution) {
                        return null;
                    }
                })
                .build();
    }

    @Bean
    public Job job(){
        return myJobBuilderFactory.get("sampleJob")
                .incrementer(new RunIdIncrementer())
                .start(sampleStep(null))
                .build();
    }

}

注意: 该作业可能有多个具有不同 chunkSizes 的步骤,在这种情况下,将针对每个步骤单独获取 chunkSize。

编辑2: 按如下方式更改我的步骤定义是可行的,但有一个问题。 这里,读者读取一个包含 17 个项目的列表,块大小为 4。

@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
   int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
   return myStepBuilderFactory.get("sampleStep")
                .<MyClass, MyClass>chunk(chunkSize)
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())  
                .listener(new ChunkListenerSupport() {
                    @Override
                    public void afterChunk(ChunkContext context) {
                        System.out.println("MyJob.afterChunk");
                    }

                    @Override
                    public void beforeChunk(ChunkContext context) {
                        System.out.println("MyJob.beforeChunk");
                    }
                })                      
                .build();
}

我第一次从 url 触发作业时,它工作正常并打印以下内容:(块大小在数据库表中设置为 4)

2021-05-03 15:06:44.859  INFO 11924 --- [nio-8081-exec-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [sampleStep]
MyJob.beforeChunk

item = 1

item = 2

item = 3

item = 4

MyJob.afterChunk

MyJob.beforeChunk

item = 5

item = 6

item = 7

item = 8

MyJob.afterChunk

MyJob.beforeChunk

item = 9

item = 10

item = 11

item = 12

MyJob.afterChunk

MyJob.beforeChunk

item = 13

item = 14

item = 15

item = 16

MyJob.afterChunk

MyJob.beforeChunk

item = 17

MyJob.afterChunk

但是如果我再次触发作业,而不重新启动服务器/spring 容器,则会打印以下内容:

2021-05-03 15:11:02.427  INFO 11924 --- [nio-8081-exec-4] o.s.batch.core.job.SimpleStepHandler     : Executing step: [sampleStep]

MyJob.beforeChunk

MyJob.afterChunk

简而言之,当服务器重新启动时,它只能运行一次。但如果不重新启动服务器,它对后续作业执行不起作用。

java spring spring-boot spring-batch
1个回答
3
投票

由于您将 ID 作为作业参数传递,并且您希望在配置步骤时根据该 ID 从数据库动态获取块大小,因此您有两种选择:

使用工作范围的步骤:

@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
   int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
   return myStepBuilderFactory.get("sampleStep")
                .<MyClass, MyClass>chunk(chunkSize)
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())                        
                .build();
}

使用步骤范围的完成策略


@Bean
@StepScope
public SimpleCompletionPolicy chunkCompletionPolicy(@Value("#{jobParameters['id']}") Integer id) {
   int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunkSize from the db table using the id job parameter
   return new SimpleCompletionPolicy(chunkSize);
}


@Bean
public Step sampleStep(PlatformTransactionManager transactionManager, SimpleCompletionPolicy chunkCompletionPolicy){
   return myStepBuilderFactory.get("sampleStep")
                .chunk(chunkCompletionPolicy, transactionManager)
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())                        
                .build();
}

注意:我建议使用步骤范围的完成策略而不是作业范围的步骤。原因是,虽然技术上可行,但不应确定步骤的范围。如果您考虑一下,确定步骤的范围实际上没有意义。这意味着:在运行时启动此步骤之前不要创建该步骤定义,但要在运行时启动该步骤,需要预先定义它 😵u200d💫

© www.soinside.com 2019 - 2024. All rights reserved.