我正在尝试在Apache TomEE中实现分布式事务。换句话说,流程是:
操作1、2和3都是TomEE控制的同一XA事务的一部分。因此,在任何情况下,它们要么全部失败,要么全部成功。
tomee.xml
<?xml version="1.0" encoding="UTF-8"?>
<tomee>
this resource adapter is just necessary to tell tomee to not start internal ActiveMq instance
<Resource id="MyAdapter" type="ActiveMQResourceAdapter">
BrokerXmlConfig
ServerUrl tcp://fakehost:666
</Resource>
<Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyOutgoingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
PhysicalName MY_OUTGOING_QUEUE
</Resource>
<Resource id="jms/MyIncomingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
PhysicalName MY_INCOMING_QUEUE
</Resource>
<Resource id="jdbc/myDBXAPooled" type="DataSource">
XaDataSource myDBXA
DataSourceCreator dbcp
JtaManaged true
UserName TestUser
Password TestPassword
MaxWait 2000
ValidationQuery SELECT 1
MaxActive 15
</Resource>
<Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test
User TestUser
Password TestPassword
</Resource>
</tomee>
Springconfig.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">
<!-- <jee:jndi-lookup jndi-name="myDBXAPooled" id="myDatasource" resource-ref="true" /> -->
<jee:jndi-lookup jndi-name="jms/MyOutgoingConnFactory" id="myOutgoingConnFactory" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyIncomingConnFactory" id="myIncomingConnFactory" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyOutgoingQueue" id="myOutgoingQueue" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyIncomingQueue" id="myIncomingQueue" resource-ref="true" />
<jee:jndi-lookup jndi-name="jdbc/myDBXAPooled" id="myDatasource" resource-ref="true" />
<tx:jta-transaction-manager/>
<!-- <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> -->
<!-- the previous two ways of getting the transactionManager seems equivalent and both get Geronimo -->
</beans>
SpringConfig.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">
<bean id="messageListener" class="com.test.MyListener">
<property name="connectionFactory" ref="myIncomingConnFactory" />
<property name="destination" ref="myIncomingQueue" />
<!-- <property name="sessionTransacted" value="true" /> -->
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="6" />
<property name="messageListener" ref="myMessageProcessor" />
<property name="transactionManager" ref="transactionManager" />
<property name="taskExecutor" ref="msgListenersTaskExecutor" />
</bean>
<bean id="myMessageProcessor" class="com.test.MyMessageReceiver">
<property name="forwardConnectionFactory" ref="myOutgoingConnFactory" />
<property name="forwardQueue" ref="myOutgoingQueue" />
<property name="datasource" ref="myDatasource" />
</bean>
<bean id="msgListenersTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>
</beans>
MyMessageReceiver.java:
package com.test;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
public class MyMessageReceiver implements MessageListener {
static Logger log = Logger.getLogger(MyMessageReceiver.class);
private ConnectionFactory forwardConnectionFactory;
private Queue forwardQueue;
private DataSource datasource;
public void setForwardConnectionFactory(ConnectionFactory connFactory) {
forwardConnectionFactory=connFactory;
}
public void setforwardQueue(Queue queue) {
forwardQueue=queue;
}
public void setDatasource(DataSource ds) {
datasource=ds;
}
@Override
@Transactional(propagation=Propagation.REQUIRED)
public void onMessage(Message message) {
log.info("************************************");
MyListener listener = (MyListener)SpringContext.getBean("messageListener");
listener.printInfo();
log.info("************************************");
TextMessage msg = (TextMessage) message;
String text = null;
try {
text = msg.getText();
if (text != null) log.info("MESSAGE RECEIVED: "+ text);
updateDB(text); // function call to update DB
sendMsg(text); // function call to publish messages to queue
System.out.println("****************Rollback");
// Throwing exception to rollback DB, Message should not be
// published and consumed message sent to a DLQ
//(Broker side DLQ configuration already done)
throw new RuntimeException();
//if (text!=null && text.indexOf("rollback")!=-1) throw new RuntimeException("Message content includes the word rollback");
} catch (Exception e) {
log.error("Rolling back the entire XA transaction");
log.error(e.getMessage());
throw new RuntimeException("Rolled back because of "+e.getMessage());
}
}
private void updateDB(String text) throws Exception {
Connection conn = null;
PreparedStatement ps = null;
try {
System.out.println("*******datasource "+datasource);
conn = datasource.getConnection();
System.out.println("*******conn "+conn.getMetaData().getUserName());
if (conn!=null) {
System.out.println("*******conn "+conn.getMetaData().getUserName());
ps = conn.prepareStatement("INSERT INTO MY_TABLE (name) VALUES(?)");
ps.setString(1, text);
ps.executeUpdate();
}
} catch (Exception e) {
throw e;
} finally {
if (ps!=null) {
try {
ps.close();
} catch (SQLException e) {
log.error(e.getMessage());
// do nothing
}
}
if (conn!=null) {
try {
conn.close();
} catch (SQLException e) {
log.error(e.getMessage());
// do nothing
}
}
}
}
private void sendMsg(String msgToBeSent) throws Exception {
javax.jms.Connection conn = null;
Session session = null;
try {
System.out.println("*************forwardConnectionFactory"+forwardConnectionFactory);
conn = forwardConnectionFactory.createConnection();
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(forwardQueue);
TextMessage msg = session.createTextMessage(msgToBeSent);
messageProducer.send(msg);
} catch (Exception e) {
throw e;
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// do nothing
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
// do nothing
}
}
}
}
}
MyListener.java:
package com.test;
import javax.transaction.Status;
import javax.transaction.SystemException;
import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.jta.JtaTransactionManager;
public class MyListener extends DefaultMessageListenerContainer {
static Logger log = Logger.getLogger(MyListener.class);
public void printInfo() {
try {
log.info("trans manager="+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager()+","+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().getStatus()+", this.isSessionTransacted()="+this.isSessionTransacted());
log.info("STATUS_ACTIVE="+Status.STATUS_ACTIVE);
log.info("STATUS_COMMITTEDE="+Status.STATUS_COMMITTED);
log.info("STATUS_COMMITTING="+Status.STATUS_COMMITTING);
log.info("STATUS_MARKED_ROLLBACK="+Status.STATUS_MARKED_ROLLBACK);
log.info("STATUS_NO_TRANSACTION="+Status.STATUS_NO_TRANSACTION);
log.info("STATUS_PREPARED="+Status.STATUS_PREPARED);
log.info("STATUS_PREPARING="+Status.STATUS_PREPARING);
log.info("STATUS_ROLLEDBACK="+Status.STATUS_ROLLEDBACK);
log.info("STATUS_ROLLING_BACK="+Status.STATUS_ROLLING_BACK);
log.info("STATUS_UNKNOWN="+Status.STATUS_UNKNOWN);
} catch (SystemException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void forceRollback() {
try {
((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().setRollbackOnly();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SystemException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
更新数据库并将消息发送到传出队列后,我故意抛出RuntimeException
只是为了测试数据库和消息代理的事务回滚。
[三个操作在成功的情况下都被提交,但是只有在失败的情况下才回滚数据库操作,而两个JMS操作仍然都被提交。
可能是:
我已经花了很多时间来解决问题并寻找可能的解决方案。
很高兴听到您对此的意见,如果事实证明这是我的错,再次表示歉意。
我相信您需要使用ActiveMQ JCA资源适配器来确保将连接自动注册到XA事务中。试试这个:
<tomee>
<Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
# Do not start the embedded ActiveMQ broker
BrokerXmlConfig =
ServerUrl = tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory">
resourceAdapter = MyJmsResourceAdapter
transactionSupport = xa
</Resource>
<Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory">
resourceAdapter = MyJmsResourceAdapter
transactionSupport = xa
</Resource>
<Resource id="jms/MyOutgoingQueue" type="javax.jms.Queue"/>
<Resource id="jms/MyIncomingQueue" type="javax.jms.Queue"/>
<Resource id="jdbc/myDBXAPooled" type="DataSource">
XaDataSource myDBXA
DataSourceCreator dbcp
JtaManaged true
UserName TestUser
Password TestPassword
MaxWait 2000
ValidationQuery SELECT 1
MaxActive 15
</Resource>
<Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test
User TestUser
Password TestPassword
</Resource>
</tomee>