我有一组配置为读取和写入RabbitMQ队列的骆驼路线,或多或少像这样:
from("rabbitmq:$rabbitMQVhost?connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueue}&routingKey=${it.rabbitMQQueue}&SOME_MORE_PROPERTIES")
.log("Read message from queue ${it.rabbitMQQueue}")
.routeId(it.rabbitMQQueue)
.noAutoStartup()
.bean(it.rabbitMQBean)
.choice()
.`when`(PredicateBuilder.and(simple("$myCondition"), isNotNull(body())))
.split(body())
.toD("rabbitmq:$rabbitMQVhost?connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueueDestination}&autoDelete=false&routingKey=${it.rabbitMQQueueDestination}&bridgeEndpoint=true")
.endChoice()
.otherwise()
end()
SOME_MORE_PROPERTIES
基本上是autoDelete=false&autoAck=false
和某些消息预取设置。
我的ConnectionFactory是org.springframework.amqp.rabbit.connection.CachingConnectionFactory
。
[每当一条消息进入我的源队列时,就会启动一个线程来处理它;但是,处理完成后,它将挂起WAIT状态,从未被释放或终止,因此我的应用程序内存过了一会儿就饱和了,垃圾回收器对此无能为力。
运行一段时间后,我的应用程序基本上处于这种状态:
如果我手动重新启动路由,则线程将终止并释放内存。
我的路由配置中出了什么问题,导致线程无法正确终止?
我想避免不得不写石英作业来不时地重新启动路由。
编辑:我最近也从Camel 2.24.0更新到了Camel 3的最新RC,但是问题仍然在发生。
确定原来处于等待状态的线程应该在那里,因为默认情况下,骆驼使用者的线程池大小为10(实际上,我最多有10 * 我的路由数] >)线程。
现在,配置线程池大小是可以完成的事情,但这并不像看起来那样容易,因为有几种不同的方法可以做到。
对我而言,此问题的解决方法是在rabbitMQ URI中设置threadPoolSize
参数:
from("rabbitmq:$rabbitMQVhost?threadPoolSize=5&connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueue}
通过这样做,按预期,在所有路由上处理了几条消息后处于WAIT状态的线程数为5 * 我的路由数
,在我的情况下更好:我没有任何线程并发性要求很高,但是我有很多路由,并且有10个挂起线程以及它们的内存占用,因为每条路由都很快耗尽了我的内存。
因为似乎没有太多关于此主题的文档,所以把它留在这里,我不得不为此花了好几天的时间。