我可以清空本地卡夫卡州立商店

问题描述 投票:1回答:1

目前,我有3个带有150个分区的kafka经纪人。我也有3个使用者,每个使用者都分配给一组分区。每个消费者都有自己的使用rocksdb的本地状态存储。在grpc调用期间将调用此内存键值存储。在重新平衡过程中(如果某个消费者消失了),那么数据将被写入其他消费者的本地存储中。

如果使用者运行了大约2周,则似乎服务用完了内存。是否有解决本地存储增长过多的解决方案?我们可以删除不再需要的分区数据吗?还是在还原使用者后有办法删除存储的数据?

apache-kafka stream apache-kafka-streams rocksdb
1个回答
1
投票

您可以使用cleanUp();启动或关闭Kafka Stream到清理状态存储时的方法。

KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions.  See tip on `cleanUp()` below.
app.cleanUp();

app.start();

通过删除有关以下内容的所有数据来清理本地StateStore到应用程序ID。在此之前只能被调用KafkaStreams实例通过调用start()方法启动或通过调用close()方法关闭实例后。

Tip

为了避免相应的恢复开销,您不应致电默认情况下为cleanUp(),但仅在确实需要时才使用。否则,您将清除本地状态并触发昂贵的状态恢复。您不会丢失数据,程序仍然正确,但是可能会大大降低启动速度(取决于状态的大小)

如果您希望在Kafka Stream的生命周期中从状态存储中删除,则可以在将其所有正确的map存储都保存在rockDB中之后,将其很好地从状态存储中删除

假设您正在使用Kafka流处理器

KeyValueStore<String, String> dsStore=(KeyValueStore<String, String>) context.getStateStore("localstorename");
KeyValueIterator<String, String> iter = this.dsStore.all();
                while (iter.hasNext()) {
                KeyValue<String, String> entry = iter.next();
                dsStore.delete(entry.key);
                }
© www.soinside.com 2019 - 2024. All rights reserved.