从主题消费并根据消费消息中的主题名称发布到主题时会出现内存峰值

问题描述 投票:0回答:1

我有一个路由,它使用来自kafka主题的JSON字符串并将数据发送到另一个路由:direct:streamerRoute

from("direct:streamerRoute").routeId("streamerRoute")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {

                        DocumentContext jsonContext = JsonPath.using(configuration).parse(exchange.getIn().getBody(String.class));
                        String tableName = jsonContext.read(eventTableNameInputExpr);
                        String eventId = jsonContext.read(eventIdInputExpr);
                        log.info("eventId: {}, tableName={}", eventId, tableName);

                        exchange.setProperty("staticDataYes", isStaticData(tableName));
                        exchange.setProperty("transactionDataYes", isTransactionData(tableName));
                    }
                })
                .choice()
                    .when(exchangeProperty("staticDataYes").isEqualTo(true))
                        .to("direct:StaticData")
                    .when(exchangeProperty("transactionDataYes").isEqualTo(true))
                        .to("direct:TransactionData")
                    .otherwise()
                        .log("The event is not either static data of transaction data")
                .end();

这些 direct:StaticData 和 direct:TransactionData 进一步只是发布到各自的主题。

我可以逐渐看到我的 POD 内存不断增加,最多可容纳 150K 条消息,达到 4.5 GB。

我不明白可能出了什么问题。

我尝试在本地运行 JProfiler,唯一的观察结果是有多个 byte[] 对象正在累积,并且内存基于此累积而不断增加。

根据从 ChatGPT 获得的一些反馈,我还在我的主要消费者路线中添加了以下内容:

.noMessageHistory()
.streamCaching("false")

这是非常简单的处理,我不希望内存以这种方式增加。即使处理完所有消息后,内存利用率也不会下降。

看起来发生了一些内存泄漏。

如果有人遇到类似情况或任何事情,请帮助恢复。

apache-camel camel-kafka-connector
1个回答
0
投票

我设法找到了问题所在。我正在使用camel可观测性库,并使用Jprofiler意识到它以某种方式将对象保存在内存中。删除库后,我的内存和 CPU 利用率都很好,而且这次处理速度相当快

© www.soinside.com 2019 - 2024. All rights reserved.