我有一个路由,它使用来自kafka主题的JSON字符串并将数据发送到另一个路由:direct:streamerRoute
from("direct:streamerRoute").routeId("streamerRoute")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
DocumentContext jsonContext = JsonPath.using(configuration).parse(exchange.getIn().getBody(String.class));
String tableName = jsonContext.read(eventTableNameInputExpr);
String eventId = jsonContext.read(eventIdInputExpr);
log.info("eventId: {}, tableName={}", eventId, tableName);
exchange.setProperty("staticDataYes", isStaticData(tableName));
exchange.setProperty("transactionDataYes", isTransactionData(tableName));
}
})
.choice()
.when(exchangeProperty("staticDataYes").isEqualTo(true))
.to("direct:StaticData")
.when(exchangeProperty("transactionDataYes").isEqualTo(true))
.to("direct:TransactionData")
.otherwise()
.log("The event is not either static data of transaction data")
.end();
这些 direct:StaticData 和 direct:TransactionData 进一步只是发布到各自的主题。
我可以逐渐看到我的 POD 内存不断增加,最多可容纳 150K 条消息,达到 4.5 GB。
我不明白可能出了什么问题。
我尝试在本地运行 JProfiler,唯一的观察结果是有多个 byte[] 对象正在累积,并且内存基于此累积而不断增加。
根据从 ChatGPT 获得的一些反馈,我还在我的主要消费者路线中添加了以下内容:
.noMessageHistory()
.streamCaching("false")
这是非常简单的处理,我不希望内存以这种方式增加。即使处理完所有消息后,内存利用率也不会下降。
看起来发生了一些内存泄漏。
如果有人遇到类似情况或任何事情,请帮助恢复。
我设法找到了问题所在。我正在使用camel可观测性库,并使用Jprofiler意识到它以某种方式将对象保存在内存中。删除库后,我的内存和 CPU 利用率都很好,而且这次处理速度相当快