我对 Spring Batch 很陌生,并且遇到了一个无法找到解决方案的问题。
我创建了一个有一个步骤和两个流程的工作:
第1步:
检索合同编号列表(为了简化,使用唯一编号来搜索更多记录)。使用 ItemReader 单块,它将传递单个合同号到下一步。
流程 1:
此流程有一个步骤(读取器,处理器,写入器),其读取器将选择此合同号并检索成员 ID 列表。这些 ID 将以块(10 个)的形式传递给处理器。
处理器将进一步执行多次查询调用,最终为写入器创建参与者详细信息列表。作者会将这些数据分块写入 Workbook 对象。
流程 2:将所有数据写入工作簿后,对象将作为文件发送到远程位置。此过程是使用一个tasklet 完成的,该tasklet 具有将文件发送到建议目的地的所有必要详细信息。
现在,一旦整个过程完成(步骤 1-> 流程 1-> 流程 2),它会检查是否有更多合同详细信息要写入远程位置。
如果是,则从列表中检索另一个合同编号,然后将其传递给流程(流程1和流程2) 一旦处理完所有合约编号,代码就会以 RepeatStatus.FINISHED 完成
添加图表以更好地理解:
它看起来像这样:
Job
-> Step 1 (retrieve Id number list but send a single contract number)
-> Flow 1
-> Reader
-> Processor
-> Writer
-> Flow 2
-> Tasklet (Send file to remote location)
(If all contract-numbers are not processed go to Step 1 and iterate to the next contract-number else finish the job)
我的问题从这里开始:
如果您认为有更好的方法,请提出建议。
如何根据条件从流程 2 跳回到步骤 1?
JobExecutionDecider
。您需要确保步骤 1 即使完成也允许重新启动(参数 allowStartIfComplete=true
)
如何在整个作业的所有步骤和流程之间传递数据? (不使用执行上下文)
如果您不想通过执行上下文共享数据,可以在步骤之间使用共享对象。
如果你改变看待工作的方式,你可以简化逻辑,不需要任何显式循环,并让 Spring 批处理为你处理它。不要使用单个列表并尝试自己迭代内容,而是使用
CompositeItemProcessor
将列表的读取和列表中每个项目的处理链接在一起。作为简化,工作变得
contractNumberList.stream()
.map(contractNumber -> toMembersList(contractNumber))
.map(members -> toParticipantDetails(members))
.map(participant -> toWorkbook(participant))
.forEach(workBook -> saveToRemoteLocation(workbook))
Spring 批次术语可能如下所示:
Item Reader 模拟合约 ID 列表。作业参数可以是一个文件名或其他东西来实现您的“检索合同编号列表”操作。
@Bean
@StepScope
public ItemReader<Integer> contractIdReader(@Value("#{jobParameters['items']}") Integer itemSize) {
final Iterator<Integer> contractNumbers = IntStream.range(1, itemSize).iterator();
return () -> {
if (contractNumbers.hasNext()) {
return contractNumbers.next();
}
return null;
};
}
然后是工作和步骤:
@Bean
public Job chunkJob(JobRepository jobRepository, Step getListOfContractNumbersStep) {
return new JobBuilder("contractToParticipantJob", jobRepository)
.start(getListOfContractNumbersStep)
.build();
}
@Bean
public Step getListOfContractNumbersStep(JobRepository jobRepository,
CompositeItemProcessor<Integer, Participant> contractIdToParticipantProcessor,
PlatformTransactionManager transactionManager,
ItemReader<Integer> contractIdReader) {
return new StepBuilder("getListOfContractNumbersStep", jobRepository)
.<Integer, Participant>chunk(1, transactionManager)
.reader(contractIdReader)
.processor(contractIdToParticipantProcessor)
.writer(chunk -> chunk.getItems().stream()
.map(p -> String.format("Finished Workbook: ContractId: %d. Members Count: %d, Members List: %s",
p.contractId,
p.membersCount,
p.membersList))
.forEach(System.out::println))
.build();
}
还有关键部分,
CompositeItemProcessor
:
@Bean
public CompositeItemProcessor<Integer, Participant> contractIdToParticipantProcessor() {
ItemProcessor<Integer, ContractWithMembers> contractIdToMembersProcessor = item -> {
System.out.println("Processing Contract Id: " + item + " getting members");
return new ContractWithMembers(item);
};
ItemProcessor<ContractWithMembers, Participant> membersToParticipantProcessor = item -> {
System.out.println("Processing Members List: " + item.members + " creating Participant");
return new Participant(item);
};
CompositeItemProcessor<Integer, Participant> compositeItemProcessor = new CompositeItemProcessor<>();
compositeItemProcessor.setDelegates(List.of(contractIdToMembersProcessor, membersToParticipantProcessor));
return compositeItemProcessor;
}
还有一些基本类来表示模型生成一些随机事物来表示成员的检索:
private class ContractWithMembers {
int contractId;
List<String> members;
ContractWithMembers(int contractId) {
this.contractId = contractId;
this.members = new Random().ints(5, 1, 11)
.mapToObj(i -> new Random().ints(i, 97, 123)
.mapToObj(c -> "" + (char) c)
.collect(Collectors.joining()))
.collect(Collectors.toList());
}
}
@Data
private class Participant {
int contractId;
int membersCount;
String membersList;
Participant(ContractWithMembers contractWithMembers) {
this.contractId = contractWithMembers.contractId;
this.membersCount = contractWithMembers.members.size();
this.membersList = contractWithMembers.members.stream().collect(Collectors.joining(","));
}
}
运行时会吐出:
Processing Contract Id: 1 getting members
Processing Members List: [dabke, jnfckped, vjzk, cvbhjb, tabyzt] creating Participant
Finished Workbook: ContractId: 1. Members Count: 5, Members List: dabke,jnfckped,vjzk,cvbhjb,tabyzt
Processing Contract Id: 2 getting members
Processing Members List: [abrggi, xczkknaihv, smjcp, gmkpfqfh, zvsomkrdd] creating Participant
Finished Workbook: ContractId: 2. Members Count: 5, Members List: abrggi,xczkknaihv,smjcp,gmkpfqfh,zvsomkrdd
Processing Contract Id: 3 getting members
Processing Members List: [jjqd, ctbiaitlts, mynsysze, if, btpysfsg] creating Participant
Finished Workbook: ContractId: 3. Members Count: 5, Members List: jjqd,ctbiaitlts,mynsysze,if,btpysfsg
Processing Contract Id: 4 getting members
Processing Members List: [aooukhjbt, fcxkfr, ssxr, tf, glodkho] creating Participant
Finished Workbook: ContractId: 4. Members Count: 5, Members List: aooukhjbt,fcxkfr,ssxr,tf,glodkho
Processing Contract Id: 5 getting members
Processing Members List: [yczz, gktx, wtu, qunfochv, cx] creating Participant
Finished Workbook: ContractId: 5. Members Count: 5, Members List: yczz,gktx,wtu,qunfochv,cx