使用 Spring、Camel 和 Atomikos 进行 ActiveMQ.Advisory.Consumer.Queue 主题泛洪

问题描述 投票:0回答:1

我想知道是否有人在使用 Atomikos + Camel + ActiveMQ Classic 组合时遇到过这个问题。我正在使用这个组合以事务处理方式从队列中剥离消息。看起来效果不错。

问题是我现在的情况是需要在ActiveMQ中打开咨询消息。这样做之后,我注意到所有队列都在不断地重新创建连接。

ActiveMQ.Advisory.Consumer.Queue
话题的泛滥就证明了这一点。这在 DEBUG 日志记录中也很明显,因为它不断创建连接、打开事务、提交事务并关闭连接。这种情况在没有任何实际应用程序生成消息的情况下发生。所有其他非事务队列/主题都不存在此问题。我在其他几篇文章中读到连接池和缓存可以缓解这个问题。看来我不应该使用缓存并且我已经使用了连接池。我正在使用这个配置:

<bean id="txq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="txJmsConfig" />
</bean>
<bean id="txJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="atomikosConnectionFactory" />
    <property name="concurrentConsumers" value="1" />
    <property name="transacted" value="true" />
    <property name="maxConcurrentConsumers" value="${consumers.concurrent.max}" />
    <property name="transactionManager" ref="jtaTransactionManager" />
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="poolSize">
        <value>4</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
    init-method="init" destroy-method="shutdownForce">
    <constructor-arg>
        <props>
            <prop key="com.atomikos.icatch.service">
                com.atomikos.icatch.standalone.UserTransactionServiceFactory
            </prop>
            <prop key="com.atomikos.icatch.max_actives">${batch.transactions.concurrent.max}</prop>
        </props>
    </constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close" depends-on="userTransactionService">
    <property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager" />
    <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

它使用了

AtomikosConnectionFactoryBean
,我认为它实现了池化。也许我错了?我很想听听是否还有其他人和我一起在这条船上,以及他们做了什么来修复它。

@PeterSmith 建议实施

彼得,谢谢你的建议。我更改了配置以使用

XaPooledConnectionFactory
。 Spring对此并不满意。它认为
XaPooledConnectionFactory
没有实现
XAConnectionFactory

<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close" depends-on="xaJmsPooledConnectionFactory">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="maxPoolSize">
        <value>32</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsPooledConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsPooledConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory"
     init-method="start" destroy-method="stop" depends-on="xaJmsConnectionFactory">
    <property name="maxConnections" value="2" />
    <property name="connectionFactory" ref="xaJmsConnectionFactory" />
</bean> 
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>
java.lang.IllegalStateException: Cannot convert value of type [org.apache.activemq.pool.XaPooledConnectionFactory] to required type [javax.jms.XAConnectionFactory] for property 'xaConnectionFactory': no matching editors or conversion strategy found

文档指出

XaPooledConnectionFactory
类实现了
javax.jms.XAConnectionFactory
,所以我在这一点上有点迷失。看来应该可以。

activemq-classic apache-camel spring-jms atomikos
1个回答
1
投票

我也使用上述组合看到了这一点。

Spring DMLC(正在实现camel jms消费者)似乎正在使用consumer.receive()循环来能够在XA事务中登记读取操作。

如果 DMLC 中没有 XA 缓存,甚至可以使用非轮询的 SMLC。

尝试将您的 activemq CF 包装在 org.apache.activemq.pool.PooledConnectionFactory 中,看看是否有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.