我正在实施m子ESB 3.9以使用来自活动mq的消息。
我实际上安排了约15个领域组件以到达最终目的地
我已经为jms配置了一个连接池,并且正在使用的消费者数量为32,接收者中的最大线程为500。
但是我看到直到每秒4条消息,jmeter发送的请求才可以。但是,如果我增加到每秒5 msgs,那么我发现jms发送的请求平均会缓慢地每秒大约3 msgs。]
关于如何配置使用者和接收方线程数的任何想法?
我的目标是每秒发送10条消息,而且我的所有15个组件每个最多仅花费30毫秒来处理。因此,这些组件的处理时间很好...但是我看到总吞吐量有时约为3秒,主要是因为未立即选择邮件。
请提出建议
Activemq xml
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10000"/>
</pendingMessageLimitStrategy>
<slowConsumerStrategy>
<abortSlowConsumerStrategy/>
</slowConsumerStrategy>
</policyEntry>
<!--Mahesh added start-->
<policyEntry queue=">" >
<slowConsumerStrategy>
<abortSlowConsumerStrategy/>
</slowConsumerStrategy>
</policyEntry>
<!--Mahesh added end-->
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="90" />
</memoryUsage>
<storeUsage>
<storeUsage limit="5 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="5 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
有关Mule ESB的其他信息
mule-config ---->
<spring:beans>
<context:component-scan base-package="com.test.esb.orm.db" />
<spring:import resource="classpath*:applicationContext.xml" />
<spring:import resource="classpath*:test-orm-beans.xml" />
<spring:import resource="mule-domain-spring.xml"/>
</spring:beans>
<db:generic-config name="Generic_Database_Configuration"
dataSource-ref="testDataSource"
doc:name="Generic Database Configuration"/>
<db:generic-config name="Generic_TrackTraceDatabase_Configuration"
dataSource-ref="testTrackTraceDataSource"
doc:name="Generic Database Configuration for Track Trace"/>
<!-- ********************* ESB Gateway Configurations ******************************** -->
<http:listener-config name="test-Shared-http-listener" host="${domain.gateway.host}" port="${domain.gateway.port}" doc:name="HTTP Listener connector for ESB Gateway (From ui)">
<http:worker-threading-profile maxThreadsActive="64"
poolExhaustedAction="WAIT"
threadWaitTimeout="30000" />
</http:listener-config>
<jms:activemq-connector name="com.test.esb.trans.jmsConnector"
username="${domain.amq.user.id}" password="${domain.amq.user.password}"
connectionFactory-ref="pooledConnectionFactory"
numberOfConsumers="16" acknowledgementMode="AUTO_ACKNOWLEDGE"
validateConnections="true"
persistentDelivery="true"
doc:name="AMQ Connector For Domain Routing Services"
specification="1.1">
<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>
<jms:activemq-connector name="com.test.esb.domain.jmsConnector"
username="${domain.amq.user.id}" password="${domain.amq.user.password}"
connectionFactory-ref="pooledConnectionFactory"
numberOfConsumers="16" acknowledgementMode="AUTO_ACKNOWLEDGE"
validateConnections="true"
persistentDelivery="false"
doc:name="AMQ Connector For Domain Routing Services"
specification="1.1">
<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>
<jms:activemq-connector name="com.test.esb.domain.orchestrator.jmsConnector"
username="${domain.amq.user.id}" password="${domain.amq.user.password}"
connectionFactory-ref="pooledConnectionFactory"
numberOfConsumers="16" acknowledgementMode="AUTO_ACKNOWLEDGE"
validateConnections="true"
persistentDelivery="true"
doc:name="AMQ Connector For Domain Routing Services"
specification="1.1">
<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>
<jms:activemq-connector name="com.test.esb.interface2.jmsConnector"
username="${domain.amq.user.id}" password="${domain.amq.user.password}"
connectionFactory-ref="pooledConnectionFactory"
validateConnections="true"
numberOfConcurrentTransactedReceivers="16"
persistentDelivery="false"
doc:name="AMQ Connector for interface2 Interface"
specification="1.1">
<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT" threadWaitTimeout="30000"/>
<dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>
<jms:activemq-connector name="com.test.esb.destination.router.jmsConnector"
username="${domain.amq.user.id}" password="${domain.amq.user.password}"
connectionFactory-ref="pooledConnectionFactory"
validateConnections="true"
numberOfConcurrentTransactedReceivers="8"
persistentDelivery="false"
doc:name="AMQ Connector for DestinationRouter"
specification="1.1">
<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<dispatcher-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT"/>
<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>
<jms:activemq-connector name="com.test.esb.dlq.jmsConnector"
username="${domain.amq.user.id}" password="${domain.amq.user.password}"
connectionFactory-ref="pooledConnectionFactory"
validateConnections="true"
numberOfConcurrentTransactedReceivers="16"
persistentDelivery="false"
doc:name="AMQ Connector for Dead Letter Queue"
specification="1.1">
<receiver-threading-profile maxThreadsActive="500" poolExhaustedAction="WAIT" threadWaitTimeout="30000"/>
<reconnect count="15" frequency="5000" blocking="true"/>
</jms:activemq-connector>
mule-spring config ---->
<spring:bean id="domainRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="maximumRedeliveries" value="5" />
<spring:property name="initialRedeliveryDelay" value="500" />
<spring:property name="maximumRedeliveryDelay" value="10000" />
<spring:property name="useExponentialBackOff" value="false" />
<spring:property name="backOffMultiplier" value="3" />
</spring:bean>
<!-- ActiveMQ Connection factory -->
<spring:bean id="domainConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
<spring:property name="brokerURL"
value="tcp://192.0.0.0:61616?jms.prefetchPolicy.all=1" />
<spring:property name="redeliveryPolicy" ref="domainRedeliveryPolicy" />
</spring:bean>
<!-- amqExceptionConnectionFactory Connection factory -->
<spring:bean id="amqExceptionConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
<spring:property name="brokerURL"
value="failover:(${domain.amq.failover.url}?jms.prefetchPolicy.all=50)" />
<spring:property name="redeliveryPolicy" ref="domainRedeliveryPolicy" />
</spring:bean>
<spring:bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" >
<spring:property name="connectionFactory" ref="domainConnectionFactory"/>
<spring:property name="maxConnections" value="10000" />
<spring:property name="maximumActiveSessionPerConnection" value="10000" />
</spring:bean>
<spring:bean id="trackTraceRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="maximumRedeliveries" value="5" />
<spring:property name="initialRedeliveryDelay" value="500" />
<spring:property name="maximumRedeliveryDelay" value="10000" />
<spring:property name="useExponentialBackOff" value="false" />
<spring:property name="backOffMultiplier" value="3" />
</spring:bean>
<spring:bean id="trackTraceConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
<spring:property name="brokerURL"
value="failover:(${domain.amq.failover.url})" />
<!-- value="tcp://${domain.amq.host}:${domain.amq.port}" /> -->
<spring:property name="redeliveryPolicy" ref="trackTraceRedeliveryPolicy" />
</spring:bean>
<spring:bean id="interface2ConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
<spring:property name="brokerURL"
value="tcp://192.0.0.0:61616?jms.prefetchPolicy.all=1" />
<!-- value="tcp://${domain.amq.host}:${domain.amq.port}" /> -->
<spring:property name="redeliveryPolicy" ref="interface2RedeliveryPolicy" />
</spring:bean>
<spring:bean id="interface2RedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="maximumRedeliveries" value="5" />
<spring:property name="initialRedeliveryDelay" value="500" />
<spring:property name="maximumRedeliveryDelay" value="10000" />
<spring:property name="useExponentialBackOff" value="false" />
<spring:property name="backOffMultiplier" value="3" />
</spring:bean>
<!-- ********************* Destination Router Configurations ******************************** -->
<!-- ActiveMQ Connection factory for destination Router -->
<spring:bean id="destinationRouterConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
<spring:property name="brokerURL"
value="tcp://192.0.0.0:61616?jms.prefetchPolicy.all=1" />
<!-- value="tcp://${domain.amq.host}:${domain.amq.port}" /> -->
<spring:property name="redeliveryPolicy"
ref="destinationRouterRedeliveryPolicy" />
</spring:bean>
<spring:bean id="destinationRouterRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="maximumRedeliveries" value="5" />
<spring:property name="initialRedeliveryDelay" value="500" />
<spring:property name="maximumRedeliveryDelay" value="10000" />
<spring:property name="useExponentialBackOff" value="false" />
<spring:property name="backOffMultiplier" value="3" />
</spring:bean>
<!-- ********************* Dead Letter Queue (DLQ) Configurations ******************************** -->
<!-- ActiveMQ Connection factory for Dead Letter Queue -->
<spring:bean id="dlqConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
<spring:property name="brokerURL"
value="failover:(${domain.amq.failover.url})" />
<spring:property name="redeliveryPolicy" ref="dlqRedeliveryPolicy" />
</spring:bean>
<spring:bean id="dlqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="maximumRedeliveries" value="5" />
<spring:property name="initialRedeliveryDelay" value="500" />
<spring:property name="maximumRedeliveryDelay" value="10000" />
<spring:property name="useExponentialBackOff" value="false" />
<spring:property name="backOffMultiplier" value="3" />
</spring:bean>
numberOfConsumers
是一个属性,用于确定如果内部处理正在使用事务,则客户机线程数。如果端点在事务中,则需要使用numberOfConcurrentTransactedReceivers
。
您确实应该删除<dispatcher-threading-profile>
和<receiver-threading-profile>
配置。每个JMS配置中有500个线程对于您的系统来说是很多的,如果创建它们,则将占用大量资源,并且根本无法解决该用例。仅应出于某些实际原因设置这些值。
尝试在处理缓慢时捕获线程转储以分析线程在做什么。这应该指出根本原因。