我有一个使用 AWS 上的 MSK 集群的
kafkaStreams
应用程序。
我需要清理状态存储(在我的应用程序中使用一些 KTables
后创建)。
我找不到任何方法来访问 MSK 集群的文件系统。
我发现here我可以使用:
KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions. See tip on `cleanUp()` below.
app.cleanUp();
app.start();
但就我而言,我使用的是 Spring Kafka,并且我的代码中没有该
KafkaStreams
实例,并且应用程序会自动启动。
我还发现,只要删除主题(我的状态存储的输入),状态存储就会被删除,不确定这里需要多少时间,我尝试删除主题,15分钟后状态存储看起来仍然存在所以我只是重新创建了这个话题。
我还发现了有关获取状态存储目录路径并使用应用程序代码删除它的建议,我确信它不会工作,因为该目录同时被应用程序本身使用,因此无法删除,也不确定应用程序是否可以删除集群中的任何内容:
String stateDirectory = config.getString(StreamsConfig.STATE_DIR_CONFIG);
// Delete the state directory using appropriate file operations
我认为解决方案的唯一方法是: 创建一个标点符号或处理器或类似的东西,获取状态存储名称,将其传递给处理器并清理那里的状态存储,这似乎是一个好的解决方案吗?
提前谢谢您。
访问 MSK 集群文件系统的任何方式
您可以通过 SSH 连接到代理 EC2 实例吗?这是唯一可行的方法。
不确定应用程序是否可以删除集群中的任何内容
正确。状态存储存储在您的应用程序运行的位置。 Kafka 集群仅存储 KTable 的压缩内部主题,而不存储任何 RocksDB 实例元数据。
您可以使用
kafka-streams-application-reset.sh
删除集群上的数据。
我正在使用 Spring Kafka,但我的代码中没有该
实例KafkaStreams
你会/应该。它只是被抽象掉了。 https://docs.spring.io/spring-kafka/docs/current/reference/html/#streams-kafka-streams
如果您只有
@KafkaListener
消费者,那么就没有使用 Kafka Streams。
我学到了一些与我之前的问题相关的有用信息,所以我想在这里分享:
正如@OneCricketeer提到的,状态存储是rocksDB数据,它将在应用程序运行的地方创建,因此stateStore不是在MSK集群中创建。
将在 MSK 集群中创建的唯一一件事是更改日志主题,这些主题正如其名称所提到的,记录了 KTable 上发生的所有更改的日志。
因此,如果 stateStore 被删除(例如手动或由于 k8s 上的 pod 重新启动),则可以在应用程序再次启动时使用变更日志主题轻松地重新创建它(RocksDB 从变更日志主题中读取数据并重建当地国营商店)。
默认情况下,状态存储将在 /tmp 目录(windows: C: mp)中创建,如果您使用 k8s,它将位于 pod 中的相同路径中,但您可以更改 KafkaStreams 的
StreamsConfig.STATE_DIR_CONFIG
应用程序更改其默认路径。
注意:您可以通过使用将创建状态存储(rocksDB)的专用卷来避免每次重新启动应用程序时重新创建状态存储的方法(时间消耗)。在 k8s 上,您可以使用持久卷声明 (PVC) 和持久卷 (PV),以便在重新启动应用程序时不删除状态存储。