项目阅读器代码:
@Bean
public JpaCursorItemReader<ExtAssessments> extAssessmentsDiabeticReader() {
String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";
JpaCursorItemReader<ExtAssessments> itemReader = new JpaCursorItemReader<>();
itemReader.setQueryString(jpqlQuery);
itemReader.setEntityManagerFactory(entityManagerFactory);
try {
itemReader.afterPropertiesSet();
} catch (Exception e) {
log.info("Exception in item reader : " + e.getMessage());
}
itemReader.setSaveState(true);
return itemReader;
}
ItemProcessor 代码 -> 使用模型映射器将项目映射到 DiabeticAssessment: extIdClassMap:用于单个错误处理 updateTargetTableStatusToProcessing:用于将状态更新为正在处理和已完成
@Bean
public ItemProcessor<ExtAssessments, DiabeticAssessment> diabeticAssessmentProcessor() {
return item -> {
DiabeticAssessment diabeticAssessment = mapper.map(item, DiabeticAssessment.class);
log.info("set created by as data migration and created date as current data.");
diabeticAssessment.setCreatedBy(CREATED_BY_DATA_MIGRATION);
diabeticAssessment.setCreatedDatetime(Date.from(Instant.now()));
log.info("update ext Assessment process status as processing");
updateTargetTableStatusToProcessing(item);
log.info("Item has been processed for ext assessment id: " + item.getId());
int hashCode = diabeticAssessment.hashCode();
extIdClassMap.put(String.valueOf(hashCode), item.getId());
return diabeticAssessment;
};
}
ItemWriter 代码:
@Transactional
@Bean
public ItemWriter<DiabeticAssessment> diabeticAssessmentWriter() {
return items -> {
Boolean isErrorFound;
String errorMessage;
for (DiabeticAssessment diabeticAssessment : items) {
isErrorFound = false;
errorMessage = "";
log.info("diabetic assessment data processing: " + diabeticAssessment);
try {
Map<String, Object> validate = isValidate(diabeticAssessment);
if ((Boolean) validate.get(IS_VALIDATE)) {
diabeticAssessmentRepository.save(diabeticAssessment);
} else {
isErrorFound = true;
errorMessage = validate.get(VALIDATION_ERROR_MESSAGE).toString();
//ERROR LOG SAVE METHOD for individual failed
log.info("ERROR JOB ID: --->>> " + jobId + " STEP ID: " + stepId + " Validation error message: "
+ validate.get(VALIDATION_ERROR_MESSAGE));
}
} catch (Exception e) {
isErrorFound = true;
errorMessage = "Exception during insertion operation";
log.info("diabetic assessment data not processed for JOB ID:" + jobId + " And STEP ID: " + stepId);
}
if (isErrorFound) {
MigrationLogRequest migrationLog = MigrationLogRequest.builder()
.extId(extIdClassMap.get(String.valueOf(diabeticAssessment.hashCode())))
.jobId(jobId)
.stepId(stepId)
.fromTable(TABLE_EXT_ASSESSMENTS)
.toTable(TABLE_DIABETIC_ASSESSMENT)
.message(errorMessage)
.severity(DATA_MIGRATION_ERROR)
.createdAt(Instant.now())
.createdBy(CREATED_BY_DATA_MIGRATION)
.build();
migrationLogService.CreateMigraitonLog(migrationLog);
}
log.info("assessment processing done for uuid: " + diabeticAssessment.getProfileId());
}
updateTargetTableStatusToCompleted();
boolean isSuccessful = validateRecords();
// if (!isSuccessful) {
// TO DO
//ERROR Message for chunk of data process failed
// loggerMethod(); //error item
// } else {
// loggerMethod(); //success item
// }
};
}
步骤代码:
@Bean
public Step stepDiabeticAssessmentInfo() {
return stepBuilder.get("stepDiabeticAssessmentInfo")
.<ExtAssessments, DiabeticAssessment>chunk(300)
.reader(extAssessmentsDiabeticReader())
.processor(diabeticAssessmentProcessor())
.faultTolerant()
.skip(Exception.class)
.skipLimit(300)
.writer(diabeticAssessmentWriter())
.build();
}
我们可以在这里看到chunk是300。
JPQL 项读取器仅调用 300 个数据项吗?还是它调用了所有数据,每次都会降低性能?
String jpqlQuery = "from ExtAssessments WHERE processStatus='new' AND type='diabetic'";
如果每次都调用所有数据,是否有任何JPQL或项目读取器方法来获取
process_status = new
的前300个数据,然后处理并保存而不获取所有数据?
也许你可以把它想象成一个分页查询,其中块数代表一个页面上读取了多少个项目, 然后是读取、处理和写入的流程。然后查询下一页定义的chunk数量。
我希望我能正确理解你并能帮助你