我有一个 Spring 作业,它使用 hibernate 从数据库检索一些数据(每个作业仅一次),并使用此数据在数据库中插入其他数据,仍然使用 hibernate。
我有一个会话对象,我尝试将其从 CustomJobExecutionListener 传递到 SpringBatchJob 中的 Writer 类,以使用相同的会话进行读取和插入,以免出现此情况
org.springframework.orm.hibernate4.HibernateSystemException: Illegal attempt to associate a collection with two open sessions;
。
在当前设置下,会话将关闭:
ERROR spi.SqlExceptionHelper - PooledConnection has already been closed.
如果我保留相同的代码,但不调用 session.flush() ,则不会发生异常,但不会在数据库中写入任何内容。
这是工作设置:
<batch:partition step="customLoadFile" partitioner="customFilePartitioner">
<batch:handler grid-size="8" task-executor="customJobTaskExecutor"/>
</batch:partition>
</batch:step>
<batch:step id="customLoadFile">
<batch:tasklet transaction-manager="customTransactionManager">
<batch:chunk reader="customFileReader" writer="customFileWriter" commit-interval="5"/>
</batch:tasklet>
</batch:step>
<bean id="customFileWriter" class="com.example.batch.CustomItemWriter">
<property name="customService" ref="customService"/>
</bean>
<bean id="customJobExecutionListener" class="com.example.batch.CustomJobExecutionListener">
<constructor-arg ref="customFileWriter"/>
<constructor-arg ref="customSessionFactory"/>
</bean>
哪里
public class CustomJobExecutionListener implements JobExecutionListener {
private final CustomItemWriter customItemWriter;
private final SessionFactory customSessionFactory;
public CustomJobExecutionListener(CustomItemWriter customItemWriter, SessionFactory customSessionFactory) {
this.customItemWriter = customItemWriter;
this.customSessionFactory = customSessionFactory;
}
@Override
public void beforeJob(JobExecution jobExecution) {
Session session = customSessionFactory.openSession();
customItemWriter.populateDataFromDb(session);
}
@Override
public void afterJob(JobExecution jobExecution) {}
}
public class CustomItemWriter implements ItemWriter<CustomLogModel> {
private CustomService customService;
private Session session;
private void populateDataFromDb(Session session) {
this.session = session;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
System.out.println("Retrieving all db info.");
List<Object[]> data = customService.getAllData(session);
data.forEach(record -> {
ArticleDomain articleSupplement = (ArticleDomain) record[2];
// Do something with articleSupplement
});
stopWatch.stop();
System.out.println("Finished retrieving data.");
}
@Override
public void write(List<? extends CustomLogModel> items) throws Exception {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (CustomLogModel item : items) {
customService.insertData(item, findArticleDomainByFileName(fileName), session);
}
stopWatch.stop();
System.out.println("Finished inserting data for a batch of " + items.size() + " items.");
}
private ArticleDomain findArticleDomainByFileName // Implementation here
}
public void setCustomService(CustomService customService) {
this.customService = customService;
}
}
public class CustomService {
@Transactional(readOnly = true)
public List<Object[]> getAllData(Session session) {
List<Object[]> dbData = session.createQuery("select i.fileName, i.date, a " + "from ArticleDomain a " + "left join fetch a.ipList i ")
.list();
return dbData;
}
@Transactional
public void insertData(CustomLogModel logModel, ArticleDomain articleDomain, Session session) {
try {
if (articleDomain.ipList && !articleDomain.ipList.empty) {
articleDomain?.ipList?.last()?.isLast = false;
}
def ipRecord = new IpRecord(articleId: logModel.articleId,
fileName: logModel.fileName,
failReason: supplementsLogModel.failReason,
...);
articleDomain.addToIpList(ipRecord);
session.save(articleSupplement);
session.flush();
} catch (Exception ex) {
log.error("Error", ex);
}
}
}
ArticleDomain 对象将有一个 IpRecord 列表,每个 IpRecord 可以具有相同的文件名,但处理日期不同
如异常所示,同一个集合有两个并发会话,导致冲突,因为连接已从另一个会话关闭。
这是因为您在手动处理 Hibernate 会话的同时还使用
@Transactional
注释,这会导致 Spring 在后台打开一个附加会话。我建议保留 @Transactional
注解并让 Spring 框架自动管理事务。
代码可以修改如下:
public class CustomItemWriter implements ItemWriter<CustomLogModel> {
@Autowired
private CustomService customService;
private void populateDataFromDb(Session session) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
System.out.println("Retrieving all db info.");
List<Object[]> data = customService.getAllData(session);
data.forEach(record -> {
ArticleDomain articleSupplement = (ArticleDomain) record[2];
....
});
stopWatch.stop();
System.out.println("Finished retrieving data.");
}
@Override
public void write(List<? extends CustomLogModel> items) throws Exception {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (CustomLogModel item : items) {
customService.insertData(item, findArticleDomainByFileName(fileName), session);
}
stopWatch.stop();
System.out.println("Finished inserting data for a batch of " + items.size() + " items.");
}
....
}
@Service
public class CustomService {
@Transactional(readOnly = true)
public List<Object[]> getAllData() {
....
}
@Transactional
public void insertData(CustomLogModel logModel, ArticleDomain articleDomain) {
....
}
}