我正在尝试使用多线程步骤和并行作业的组合在Spring Batch中完成复杂的作业流程。
现在我已经设置了3个工作(1,2,3),其中第一个(1)在其他工作之前运行(如预期的那样)并完成无问题。其他两个(2,3)应该是并行运行的,它们有一些自己的并行步骤。我正在尝试运行的所有这些作业都封装在JobSteps中,然后在主作业中运行(0)。
问题仅发生在作业2和3,其中一些JobStep失败,并不总是在同一点,并不总是相同的JobStep。这是此类异常的堆栈跟踪:
2019-01-07 17:35:57,513 ERROR: o.s.b.c.s.AbstractStep [SimpleAsyncTaskExecutor-10] Encountered an error executing step 2 in job0
org.springframework.dao.DataAccessResourceFailureException: Could not increment identity; nested exception is java.sql.SQLTransactionRollbackException: transaction rollback: serialization failure at org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer.getNextKey(AbstractIdentityColumnMaxValueIncrementer.java:113) ~[spring-jdbc-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer.nextLongValue(AbstractDataFieldMaxValueIncrementer.java:128) ~[spring-jdbc-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.batch.core.repository.dao.JdbcJobExecutionDao.saveJobExecution(JdbcJobExecutionDao.java:151) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:145) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at com.sun.proxy.$Proxy111.createJobExecution(Unknown Source) ~[?:?]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at com.sun.proxy.$Proxy181.run(Unknown Source) ~[?:?]
at org.springframework.batch.core.step.job.JobStep.doExecute(JobStep.java:117) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:93) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:90) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_60]
at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run(SimpleAsyncTaskExecutor.java:271) [spring-core-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_60]
Caused by: java.sql.SQLTransactionRollbackException: transaction rollback: serialization failure at org.hsqldb.jdbc.JDBCUtil.sqlException(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCUtil.sqlException(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCStatement.fetchResult(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCStatement.executeUpdate(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer.getNextKey(AbstractIdentityColumnMaxValueIncrementer.java:110) ~[spring-jdbc-4.3.13.RELEASE.jar:4.3.13.RELEASE]
... 42 more
Caused by: org.hsqldb.HsqlException: transaction rollback: serialization failure at org.hsqldb.error.Error.error(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.error.Error.error(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.Session.handleAbortTransaction(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.Session.executeCompiledStatement(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.Session.executeDirectStatement(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.Session.execute(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCStatement.fetchResult(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.hsqldb.jdbc.JDBCStatement.executeUpdate(Unknown Source) ~[hsqldb-2.3.5.jar:2.3.5]
at org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer.getNextKey(AbstractIdentityColumnMaxValueIncrementer.java:110) ~[spring-jdbc-4.3.13.RELEASE.jar:4.3.13.RELEASE]
... 42 more
我做了一些研究,当我用于存储作业信息的HSQLDB没有正确设置并发时,通常会出现这种错误。但是我已经使用看似好的配置,MVCC事务模式:
@Configuration
public class HSqlDbConfig {
@Primary
@Bean("hsqldbDataSource")
public DataSource hsqldbDataSource() {
final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
dataSource.setDriver(new org.hsqldb.jdbcDriver());
dataSource.setUrl("jdbc:hsqldb:mem:mydb;sql.enforce_strict_size=true;hsqldb.tx=mvcc");
dataSource.setUsername("sa");
dataSource.setPassword("");
return dataSource;
}
}
这些是我用于配置这些作业的代码片段。从主要工作开始(0):
@Bean
public Job job0(JobBuilderFactory jobBuilderFactory) {
getJobParameters();
jobLauncher = (JobLauncher) ctx.getBean("jobLauncher");
Flow job1 = getJob1();
Flow job2 = getJob2();
Flow job3 = getJob3();
Flow splitFlow = getSplitFlow(job2, job3);
return jobBuilderFactory.get("Master Job")
.incrementer(new RunIdIncrementer())
.start(job1)
.next(splitFlow)
.end()
.build();
}
如何获得工作2和3的流程:
private Flow getJob2() {
Job j2 = (Job) ctx.getBean("job2");
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
Step step0 = getJobStep(j2, extractor);
return new FlowBuilder<Flow>("job2")
.start(step0)
.build();
}
private Flow getJob3() {
Job j3 = (Job) ctx.getBean("job3");
Job j3k = (Job) ctx.getBean("job3K");
Job j3l = (Job) ctx.getBean("job3L");
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
Step step0 = getJobStep(j3, extractor);
Step step1 = getJobStep(j3k, params1);
Step step2 = getJobStep(j3k, params2);
Step step3 = getJobStep(j3l, params3);
Step step4 = getJobStep(j3l, params4);
Flow flow1 = new FlowBuilder<Flow>("flowJ3f1")
.start(step1)
.next(step2)
.build();
Flow flow2 = new FlowBuilder<Flow>("flowJ3f2")
.start(step3)
.next(step4)
.build();
return new FlowBuilder<Flow>("job3")
.start(step0)
.split(taskExecutor)
.add(flow1, flow2)
.build();
}
两个getJobStep()方法:
private Step getJobStep(Job job, JobParametersExtractor extractor) {
return steps.get(job.getName())
.job(job)
.launcher(jobLauncher)
.parametersExtractor(extractor)
.build();
}
private Step getJobStep(Job job, JobParameters jobParameters) {
SimpleJobParametersExtractor extractor = new SimpleJobParametersExtractor();
extractor.setJobParameters(jobParameters);
return steps.get(job.getName())
.job(job)
.launcher(jobLauncher)
.parametersExtractor(extractor)
.build();
}
我们的想法是让这个结构工作,因为所有可能的任务的并行化是这个项目的必要条件,它应该足够坚固,可以添加除2和3之外的其他并行作业。此外,所有作业和步骤都经过测试并发性,它们按预期工作。
如果需要,我可以提供更多代码。现在我觉得自己处于死胡同,所以每一点帮助都会受到赞赏。
编辑:正如@MahmoudBenHassine建议的那样,我已经配置了一个JobRepository,TransactionManager和JobLauncher(它解决了我的第一个问题),如下所示:
@Bean(name = "myTransactionManager")
public DataSourceTransactionManager transactionManager(@Qualifier("hsqldbDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "myJobRepository")
public JobRepository jobRepository(@Qualifier("hsqldbDataSource") DataSource dataSource,
@Qualifier("myTransactionManager") DataSourceTransactionManager dataSourceTransactionManager) throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(dataSourceTransactionManager);
factory.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean(name = "myJobLauncher")
public JobLauncher getJobLauncher(@Qualifier("myJobRepository") JobRepository jobRepository) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
然后我使用新的JobLauncher运行作业0,这个例外发生在作业3的JobSteps上(3K和3L)编辑:我有一个更大的日志提取到更多关于正在发生的事件的上下文:
2019-01-09 09:53:39,793 INFO: o.s.b.c.j.SimpleStepHandler [MainTaskExecutor11] Executing step: [3K]
2019-01-09 09:53:39,811 INFO: o.s.b.c.l.s.SimpleJobLauncher [MainTaskExecutor11] Job: [FlowJob: [name=3K]] launched with the following parameters: [{process=Job 3K BAR, pos_cod=BAR, per_event=EVENT, isRet=false, UNIQUE=-2347943936040182027}]
2019-01-09 09:53:39,821 INFO: o.s.b.c.j.SimpleStepHandler [MainTaskExecutor11] Executing step: [[3K] Job 3K]
2019-01-09 09:53:39,822 INFO: e.i.l.d.l.StepListener [MainTaskExecutor11] Executing Step: [3K] Job 3K
2019-01-09 09:53:41,120 INFO: e.i.l.d.l.StepListener [MainTaskExecutor11] Write Count: 0
2019-01-09 09:53:41,166 INFO: o.s.b.c.l.s.SimpleJobLauncher [MainTaskExecutor11] Job: [FlowJob: [name=3K]] completed with the following parameters: [{process=Job 3K BAR, pos_cod=BAR, per_event=EVENT, isRet=false, UNIQUE=-2347943936040182027}] and the following status: [COMPLETED]
2019-01-09 09:53:41,189 INFO: o.s.b.c.j.SimpleStepHandler [MainTaskExecutor11] Duplicate step [3K] detected in execution of job=[job 0]. If either step fails, both will be executed again on restart.
2019-01-09 09:53:41,191 INFO: o.s.b.c.j.SimpleStepHandler [MainTaskExecutor11] Executing step: [3K]
2019-01-09 09:53:41,201 ERROR: o.s.b.c.s.AbstractStep [MainTaskExecutor11] Encountered an error executing step 3K in job 0
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={process=Job 3K BAR, pos_cod=BAR, per_event=EVENT, isRet=false, UNIQUE=-2347943936040182027}. If you want to run this job again, change the parameters.
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:1.8.0_60]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at com.sun.proxy.$Proxy111.createJobExecution(Unknown Source) ~[?:?]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.step.job.JobStep.doExecute(JobStep.java:117) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:93) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:90) [spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_60]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_60]
请注意我在getJob3()
中为每次执行作业3K和3L使用不同的JobParameters(对于该作业也会抛出相同的异常)。我使用的JobParameters如下:编辑:我已经包含一个Random函数,以包含一个唯一的JobParameter:
Random randomizer = new Random(System.currentTimeMillis());
params1 = new JobParametersBuilder()
.addString(PROCESS, "Job 3K BAR" )
.addString("pos_cod", "BAR")
.addString("per_event", "EVENT")
.addString(IS_RET, FALSE)
.addLong("UNIQUE", randomizer.nextLong())
.toJobParameters();
params2 = new JobParametersBuilder()
.addString(PROCESS, "Job 3K 704" )
.addString("pos_cod", "704")
.addString("per_event", "EVENT")
.addString(IS_RET, FALSE)
.addLong("UNIQUE", randomizer.nextLong())
.toJobParameters();
params3 = new JobParametersBuilder()
.addString(PROCESS, "Job 3L BAR" )
.addString("pos_cod", "BAR")
.addString("per_event", "RET_EVENT")
.addString(IS_RET, FALSE)
.addLong("UNIQUE", randomizer.nextLong())
.toJobParameters();
params4 = new JobParametersBuilder()
.addString(PROCESS, "Job 3L 704" )
.addString("pos_cod", "704")
.addString("per_event", "RET_EVENT")
.addString(IS_RET, FALSE)
.addLong("UNIQUE", randomizer.nextLong())
.toJobParameters();
看到重复的作业实例错误一直在发生,因为JobLauncher正在尝试启动相同的工作(而不是与其他JobParameters的兄弟),我更倾向于认为这是我的JobRepository的一个问题,但这只不过是推测。
基于Map
的作业存储库是用于多线程的not intended。默认情况下,此作业存储库配置有ResourcelessTransactionManager
。
确保使用基于JDBC的作业存储库(即使使用像您的情况一样的内存数据库)来正确支持多线程和并行处理以及DataSourceTransactionManager
。
作为旁注,SimpleDriverDataSource
不是池数据源,我建议使用带有连接池的数据源。