我有一个无状态EJB,它将数据插入数据库,立即发送响应,并在最后一步调用异步EJB。异步EJB可以运行很长时间(我的意思是5-10分钟,这比JPA事务超时时间长)。异步ejb需要读取(并处理它)与无状态EJB持久化的记录树(仅读取)相同的记录树。
似乎异步bean在statelsss EJB提交或插入(JPA)之前尝试读取记录树,因此异步bean不能看到记录树。
无状态EJB:
@Stateless
public class ReceiverBean {
public void receiverOfIncomingRequest(data) {
long id = persistRequest(data);
sendResponseToJmsBasedOnIncomingData(data);
processorAsyncBean.calculate(id);
}
}
}
异步EJB:
@Stateless
public class ProcessorAsyncBean {
@Asynchronous
public void calculate(id) {
Data data = dao.getById(id); <- DATA IS ALLWAYS NULL HERE!
// the following method going to send
// data to external system via internet (TCP/IP)
Result result = doSomethingForLongWithData(data);
updateData(id, result);
}
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void updateData(id, result) {
dao.update(id, result);
}
也许我可以使用JMS队列将带有ID的信号发送到处理器bean而不是调用asyc ejb(以及消息驱动的bean从数据库读取数据),但我想尽可能避免这种情况。
另一种解决方案是将整个记录树作为分离的JPA对象传递给处理器异步EJB,而不是从数据库中读取数据。
我能否以某种方式使异步EJB在这个结构中运行良好?
- 更新 -
我在考虑使用Weblogic JMS。这里还有另一个问题。如果负载很大,当队列中有100 000或更多数据(这将是正常的)并且没有互联网连接时,队列中的所有数据都将失败。如果在通过互联网发送数据期间出现异常(或任何)(通过doSomethingForLongWithData
方法),数据将根据Weblogic的redelivery-limit
和repetitaion
设置回滚到原始队列。此回滚事件将在受管服务器中的Weblogic上生成100 000个或更多线程,以管理重新传递。新的后台进程可以杀死或至少减慢服务器的速度。
我也可以使用IBM MQ,因为我们有MQ基础架构。 MQ对Weblogic服务器没有这种影响,但MQ没有重新传递限制和延迟功能。因此,如果出现错误(回滚),消息将立即再次出现在MQ上,没有延迟,我建立了一个手工磨机。 Thread.sleep()
条件下的catch
不是EE应用程序中的解决方案我猜...
似乎异步bean在statelsss EJB提交或插入(JPA)之前尝试读取记录树,因此异步bean不能看到记录树。
这是bean管理事务的预期行为。您正在使用自己的事务上下文从EJB启动异步EJB。异步EJB从不使用调用者事务上下文(参见EJB规范4.5.3)。只要您没有使用持久性的事务隔离级别“read uncommited”,您就不会看到来自调用者的仍未提交的数据。
当异步作业不提交时(例如,应用程序服务器关闭或异常堕胎),您必须考虑这种情况。以下计算和更新是否至关重要?如果未成功执行或甚至不调用,异步过程是否可恢复?
您可以考虑使用bean托管事务,在调用异步EJB之前提交。或者,您可以使用新的transactin上下文将数据更新委派给另一个EJB。这将在异步EJB调用之前提交。对于不加批判的东西,丢失或失败,这通常是可以的。
使用持久性和事务性JMS消息以及死信队列具有可靠处理您的计算和更新的优势,即使在中间停止/启动应用程序服务器或在处理期间出现时间错误也是如此。
您只需要在具有事务标记的方法旁边调用异步方法,因此在提交事务时。
例如,receiverOfIncomingRequest()方法的调用者可以添加
processorAsyncBean.calculate(id);
打电话给它旁边。
更新:扩展示例
CallerMDB
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public void onMessage(Message message) {
long id = receiverBean.receiverOfIncomingRequest(data);
processorAsyncBean.calculate(id);
}
ReceiverBean
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public long receiverOfIncomingRequest(data) {
long id = persistRequest(data);
sendResponseToJmsBasedOnIncomingData(data);
return id;
}