我有一个具有以下结构的 Spring Boot 项目:
调用 API 时,控制器方法 processUser 会调用异步方法,API 不会等待其完成。从该方法中,我调用 UserService 来处理用户。
电流:
控制器 -> 为用户创建一个线程(使用@Async方法) -> UserService(使用@Transactional)
现在,我想改变逻辑,如果一个用户有多个子用户,UserService 应该异步并行处理所有子用户,以节省时间。但是,如果在任何子用户的处理过程中出现错误,我希望整个事务回滚。用户线程应该等待所有子用户线程完成才能完成事务。
所需流量:
控制器 -> 为用户创建一个线程(使用@Async方法) -> UserService(使用@Transactional)-> 多线程并行 处理(使用@Async方法)-> SubUserService(使用@Transactional)
当前我面临以下异常:
原因:java.lang.IllegalStateException:无线程绑定请求 发现:您是否指的是实际的请求属性之外的属性 Web 请求,或处理原始请求之外的请求 接收线程?如果您实际上是在网络请求中进行操作 并且仍然收到此消息,您的代码可能正在外部运行 DispatcherServlet 的:在这种情况下,使用 RequestContextListener 或 RequestContextFilter 用于公开当前请求。
注意:
@Bean
@Scope(value = "request", proxyMode = ScopedProxyMode.TARGET_CLASS)
public FCSessionDataBeanWebapp sessionDataBean() {
AuthenticationUser authenticationUser = null;
try{
SecurityContext securityContext = SecurityContextHolder.getContext();
OAuth2Authentication authentication = (OAuth2Authentication) securityContext.getAuthentication();
authenticationUser = (AuthenticationUser) authentication.getPrincipal();
}
catch(Exception ex){
log.error(ex.getMessage(), ex);
}
if(authenticationUser != null)
return authenticationUser.getSessionDataBeanWebapp();
else
return new FCSessionDataBeanWebapp();
}
问题:
- 如何解决上面提到的“EntityManager已关闭”异常?
- 有没有办法让子用户线程使用父用户线程的事务?
- 是否有另一种方法可以并行处理子用户,同时仍然使用父用户的事务,例如使用 @TransactionalEventListener 还是其他方法?
这是我当前的实现:
控制器:
@Autowired
UserAsyncService userAsyncService;
@PostMapping("/process-user")
public ResponseEntity<Object> processUser(@RequestBody Integer userId) {
userAsyncService.processUserAsync(userId);
return ResponseEntity.ok().build();
}
用户异步服务:
@Service
public class UserAsyncService {
@Autowired
UserService userService;
@Async("threadPoolTaskExecutor")
public void processUserAsync(Integer userId) throws Exception {
userService.processUser(userId);
}
}
用户服务:
@Service
@Transactional
public class UserService {
@Autowired
SubUserAsyncService subUserAsyncService;
public void processUser(Integer userId) throws Exception {
// process user
// done db process for user
List<Integer> subUsers = userDao.retriveSubUserIds(userId);
List<CompletableFuture<Void>> futures = new ArrayList<>();
for(Integer subUser: subusers) {
// Question : How can the following thread be implemented to use the current thread's transaction, ensuring that all subuser and parentuser data in the database is rolled back in case of an error?
CompletableFuture<Void> future = subUserAsyncService.processSubUser(subUser);
futures.add(future);
}
// Wait for all sunuser threads to complete
for (CompletableFuture<Void> future : futures) {
future.join();
}
executorService.shutdown();
}
}
子用户异步服务:
@Service
public class SubUserAsyncService {
@Autowired
SubUserService subUserService;
@Async("threadPoolTaskExecutor2")
public CompletableFuture<Void> processSubUserAsync(Integer userId) throws Exception {
subUserService.processUser(userId);
}
}
子用户服务:
@Transactional
@Service
public class SubUserService {
@Autowired
SubUserDao subUserDao;
public void processUser(Integer userId) throws Exception {
// process subuser
subUserDao.processUser(userId);
}
}
我将子用户进程分为两部分:
对于第一部分,我使用带有新事务的多线程环境。该事务仅用于获取数据,因此不需要回滚。
在第二部分中,该过程是在原始用户过程方法内完成的。这确保了如果发生任何错误,子用户和父用户的事务都会完全回滚。