Apache Camel RabbitMQ在等待状态下留下线程

问题描述 投票:2回答:1

我有一组配置为读取和写入RabbitMQ队列的骆驼路线,或多或少像这样:

from("rabbitmq:$rabbitMQVhost?connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueue}&routingKey=${it.rabbitMQQueue}&SOME_MORE_PROPERTIES")
    .log("Read message from queue ${it.rabbitMQQueue}")
    .routeId(it.rabbitMQQueue)
    .noAutoStartup()
    .bean(it.rabbitMQBean)
    .choice()
    .`when`(PredicateBuilder.and(simple("$myCondition"), isNotNull(body())))
        .split(body())
        .toD("rabbitmq:$rabbitMQVhost?connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueueDestination}&autoDelete=false&routingKey=${it.rabbitMQQueueDestination}&bridgeEndpoint=true")
        .endChoice()
    .otherwise()
    end()

SOME_MORE_PROPERTIES基本上是autoDelete=false&autoAck=false和某些消息预取设置。

我的ConnectionFactory是org.springframework.amqp.rabbit.connection.CachingConnectionFactory

[每当一条消息进入我的源队列时,就会启动一个线程来处理它;但是,处理完成后,它将挂起WAIT状态,从未被释放或终止,因此我的应用程序内存过了一会儿就饱和了,垃圾回收器对此无能为力。

运行一段时间后,我的应用程序基本上处于这种状态:

enter image description here

如果我手动重新启动路由,则线程将终止并释放内存。

我的路由配置中出了什么问题,导致线程无法正确终止?

我想避免不得不写石英作业来不时地重新启动路由。

编辑:我最近也从Camel 2.24.0更新到了Camel 3的最新RC,但是问题仍然在发生。

kotlin rabbitmq apache-camel
1个回答
0
投票

确定原来处于等待状态的线程应该在那里,因为默认情况下,骆驼使用者的线程池大小为10(实际上,我最多有10 * 我的路由数] >)线程。

现在,配置线程池大小是可以完成的事情,但这并不像看起来那样容易,因为有几种不同的方法可以做到。

对我而言,此问题的解决方法是在rabbitMQ URI中设置threadPoolSize参数:

from("rabbitmq:$rabbitMQVhost?threadPoolSize=5&connectionFactory=#customConnectionFactory&queue=${it.rabbitMQQueue}

通过这样做,按预期,在所有路由上处理了几条消息后处于WAIT状态的线程数为5 * 我的路由数

,在我的情况下更好:我没有任何线程并发性要求很高,但是我有很多路由,并且有10个挂起线程以及它们的内存占用,因为每条路由都很快耗尽了我的内存。

因为似乎没有太多关于此主题的文档,所以把它留在这里,我不得不为此花了好几天的时间。

© www.soinside.com 2019 - 2024. All rights reserved.