我需要在存储在数据库中的 Spring Batch 作业步骤中动态设置块大小,即需要从数据库中获取块大小并将其设置到 bean 中。
我的查询是这样的:
select CHUNK_SIZE from SOME_TABLE_NAME where ID='some_id_param_value'
这里
ID
的值将来自作业参数,该参数是通过随请求传递到 Rest Controller
的请求参数设置的(同时触发批处理作业)
我想从数据库中获取这个
CHUNK_SIZE
并将其动态设置到作业的步骤中。
我们的要求是块大小根据 ID 值而变化,其详细信息存储在数据库表中。例如:
身份证 | CHUNK_SIZE |
---|---|
01 | 1000 |
02 | 2500 |
我知道作业中的bean是在配置时设置的,作业参数是在运行时触发作业时传递的。
编辑:
MahmoudBenHassine 提供的示例使用
@JobScope
并使用 @Value("#{jobParameters['id']}")
访问步骤 bean 中的 jobParameters。我尝试使用 jobExecutionContext
实现类似的方法,如下所示:
从 db 表中获取 chunkSize StepExecutionListener 的
beforeStep
方法并在
ExecutionContext
。
用
@JobScope
注释步骤 bean 并使用
@Value("#{jobExecutionContext['chunk']}")
在步骤中访问它
豆子。
但是我面临以下错误:
Error creating bean with name 'scopedTarget.step' defined in class path resource [com/sample/config/SampleBatchConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'step' threw exception; nested exception is java.lang.NullPointerException
它无法从
jobExecutionContext
访问“块”键值,从而抛出 NullPointerException
。
是否需要以某种方式提升它以便可以在步骤 bean 中访问它?如果是的话,我们将不胜感激。
我的控制器类:
@RestController
public class SampleController {
@Autowired
JobLauncher sampleJobLauncher;
@Autowired
Job sampleJob;
@GetMapping("/launch")
public BatchStatus launch(@RequestParam(name = "id", required = true) String id){
Map<String, JobParameter> map = new HashMap<>();
map.put("id", new JobParameter(id));
map.put("timestamp", new JobParameter(System.currentTimeMillis));
JobParameters params = new JobParameters(map);
JobExecution j = sampleJobLauncher.run(sampleJob, params);
return j.getStatus();
}
}
我的批处理配置类(包含作业和步骤bean):
@Configuration
public class SampleBatchConfig{
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private MyRepoClass myRepo; // this class contains the jdbc method to fetch chunksize from the db table
@Autowired
MyReader myReader;
@Autowired
MyWriter myWriter;
@Bean
@JobScope
public Step sampleStep(@Value("#{jobExecutionContext['chunk']}") Integer chunkSize){
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize) //TODO ~instead of hardcoding the chunkSize or getting it from the properties file using @Value, the requirement is to fetch it from the db table using the above mentioned query with id job parameter and set it here
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
int chunk = myRepo.findChunkSize(stepExecution.getJobExecution().getExecutionContext().get("id")); // this method call fetches chunksize from the db table using the id job parameter
stepExecution.getJobExecution().getExecutionContext().put("chunk", chunk);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
})
.build();
}
@Bean
public Job job(){
return myJobBuilderFactory.get("sampleJob")
.incrementer(new RunIdIncrementer())
.start(sampleStep(null))
.build();
}
}
注意: 该作业可能有多个具有不同 chunkSizes 的步骤,在这种情况下,将针对每个步骤单独获取 chunkSize。
编辑2: 按如下方式更改我的步骤定义是可行的,但有一个问题。 这里,读者读取一个包含 17 个项目的列表,块大小为 4。
@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.listener(new ChunkListenerSupport() {
@Override
public void afterChunk(ChunkContext context) {
System.out.println("MyJob.afterChunk");
}
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("MyJob.beforeChunk");
}
})
.build();
}
我第一次从 url 触发作业时,它工作正常并打印以下内容:(块大小在数据库表中设置为 4)
2021-05-03 15:06:44.859 INFO 11924 --- [nio-8081-exec-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
MyJob.beforeChunk
item = 1
item = 2
item = 3
item = 4
MyJob.afterChunk
MyJob.beforeChunk
item = 5
item = 6
item = 7
item = 8
MyJob.afterChunk
MyJob.beforeChunk
item = 9
item = 10
item = 11
item = 12
MyJob.afterChunk
MyJob.beforeChunk
item = 13
item = 14
item = 15
item = 16
MyJob.afterChunk
MyJob.beforeChunk
item = 17
MyJob.afterChunk
但是如果我再次触发作业,而不重新启动服务器/spring 容器,则会打印以下内容:
2021-05-03 15:11:02.427 INFO 11924 --- [nio-8081-exec-4] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
MyJob.beforeChunk
MyJob.afterChunk
简而言之,当服务器重新启动时,它只能运行一次。但如果不重新启动服务器,它对后续作业执行不起作用。
由于您将 ID 作为作业参数传递,并且您希望在配置步骤时根据该 ID 从数据库动态获取块大小,因此您有两种选择:
@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.build();
}
@Bean
@StepScope
public SimpleCompletionPolicy chunkCompletionPolicy(@Value("#{jobParameters['id']}") Integer id) {
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunkSize from the db table using the id job parameter
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager, SimpleCompletionPolicy chunkCompletionPolicy){
return myStepBuilderFactory.get("sampleStep")
.chunk(chunkCompletionPolicy, transactionManager)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.build();
}
注意:我建议使用步骤范围的完成策略而不是作业范围的步骤。原因是,虽然技术上可行,但不应确定步骤的范围。如果您考虑一下,确定步骤的范围实际上没有意义。这意味着:在运行时启动此步骤之前不要创建该步骤定义,但要在运行时启动该步骤,需要预先定义它 😵u200d💫