我想知道是否有人在使用 Atomikos + Camel + ActiveMQ Classic 组合时遇到过这个问题。我正在使用这个组合以事务处理方式从队列中剥离消息。看起来效果不错。
问题是我现在的情况是需要在ActiveMQ中打开咨询消息。这样做之后,我注意到所有队列都在不断地重新创建连接。
ActiveMQ.Advisory.Consumer.Queue
话题的泛滥就证明了这一点。这在 DEBUG 日志记录中也很明显,因为它不断创建连接、打开事务、提交事务并关闭连接。这种情况在没有任何实际应用程序生成消息的情况下发生。所有其他非事务队列/主题都不存在此问题。我在其他几篇文章中读到连接池和缓存可以缓解这个问题。看来我不应该使用缓存并且我已经使用了连接池。我正在使用这个配置:
<bean id="txq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="txJmsConfig" />
</bean>
<bean id="txJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="atomikosConnectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="transacted" value="true" />
<property name="maxConcurrentConsumers" value="${consumers.concurrent.max}" />
<property name="transactionManager" ref="jtaTransactionManager" />
<property name="cacheLevelName" value="CACHE_NONE" />
</bean>
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName">
<value>XA-JMS-ATOMIKOS</value>
</property>
<property name="localTransactionMode">
<value>false</value>
</property>
<property name="poolSize">
<value>4</value>
</property>
<property name="xaConnectionFactory">
<ref bean="xaJmsConnectionFactory" />
</property>
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL"
value="${queue.address}?jms.watchTopicAdvisories=false&jms.prefetchPolicy.all=0" />
</bean>
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
init-method="init" destroy-method="shutdownForce">
<constructor-arg>
<props>
<prop key="com.atomikos.icatch.service">
com.atomikos.icatch.standalone.UserTransactionServiceFactory
</prop>
<prop key="com.atomikos.icatch.max_actives">${batch.transactions.concurrent.max}</prop>
</props>
</constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close" depends-on="userTransactionService">
<property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
它使用了
AtomikosConnectionFactoryBean
,我认为它实现了池化。也许我错了?我很想听听是否还有其他人和我一起在这条船上,以及他们做了什么来修复它。
@PeterSmith 建议实施
彼得,谢谢你的建议。我更改了配置以使用
XaPooledConnectionFactory
。 Spring对此并不满意。它认为 XaPooledConnectionFactory
没有实现 XAConnectionFactory
。
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close" depends-on="xaJmsPooledConnectionFactory">
<property name="uniqueResourceName">
<value>XA-JMS-ATOMIKOS</value>
</property>
<property name="localTransactionMode">
<value>false</value>
</property>
<property name="maxPoolSize">
<value>32</value>
</property>
<property name="xaConnectionFactory">
<ref bean="xaJmsPooledConnectionFactory" />
</property>
</bean>
<bean id="xaJmsPooledConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory"
init-method="start" destroy-method="stop" depends-on="xaJmsConnectionFactory">
<property name="maxConnections" value="2" />
<property name="connectionFactory" ref="xaJmsConnectionFactory" />
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL"
value="${queue.address}?jms.watchTopicAdvisories=false&jms.prefetchPolicy.all=0" />
</bean>
java.lang.IllegalStateException: Cannot convert value of type [org.apache.activemq.pool.XaPooledConnectionFactory] to required type [javax.jms.XAConnectionFactory] for property 'xaConnectionFactory': no matching editors or conversion strategy found
文档指出
XaPooledConnectionFactory
类实现了 javax.jms.XAConnectionFactory
,所以我在这一点上有点迷失。看来应该可以。
我也使用上述组合看到了这一点。
Spring DMLC(正在实现camel jms消费者)似乎正在使用consumer.receive()循环来能够在XA事务中登记读取操作。
如果 DMLC 中没有 XA 缓存,甚至可以使用非轮询的 SMLC。
尝试将您的 activemq CF 包装在 org.apache.activemq.pool.PooledConnectionFactory 中,看看是否有帮助。