我在与 Spring Cloud Stream 集成的 Kafka Streams 应用程序方面遇到问题。问题围绕着一个特定的主题,它的大小不断增加并且从不释放数据:
本主题的保留配置为-1。
生成此重新分区的实现是由以下代码创建的:
@Bean
public BiFunction<KStream<String, Undo>, KStream<String, MyObjectUpdate>, KStream<String, UndoMyObject>> mergeUndoWithMyObjectUpdate() {
return (undo, myObjectUpdate) ->
undo.selectKey((k, v) -> v.getMyObjectDesignator())
.join(myObjectUpdate.selectKey((k, v) -> (v.getMyObject() == null ? null : v.getMyObject().getDesignator())),
this::execute,
JoinWindows.of(Duration.ofSeconds(3)),
StreamJoined.with(Serdes.String(),
new JsonSerde<>(Undo.class),
new JsonSerde<>(MyObjectUpdate.class)))
.filterNot((k, v) -> v == null)
.map((k, v) -> new KeyValue<>(v.getUndo().getId(), v))
.peek((k, v) -> {
log.debug("Merged Undo with MyObjectUpdate -> {}, {}", v.getUndo().getId(), v.getAfterMyObject().getId());
if (repository.getByKey(v.getUndo().getId()).isEmpty()) {
undoStompController.send(v.getUndo().getId(), v.getChanges(), v.getUndo().getSessionId());
}
}
);
}
我最初的反应只是在内部主题中设置保留。
但我注意到它没有使用默认配置,而是被标记为 DYNAMIC_TOPIC_CONFIG(在 AKHQ 中)。经过一番研究后,我发现评论建议不要直接设置这些值并让 Kafka 自行处理保留。
我试图理解:
这是 AKHQ 的屏幕截图,显示了主题信息(我编辑了主题和消费者组名称):
感谢您为解决此 Kafka Streams 问题提供的任何见解或帮助。任何其他信息或建议将不胜感激。
听起来像是权限问题?您需要为 Kafka Streams 提供正确的 ACL,特别是内部主题的“删除”权限:
来自 https://docs.confluence.io/platform/current/streams/developer-guide/security.html
# Allow Streams to read the input topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --topic input-topic1 --topic input-topic2
# Allow Streams to write to the output topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --topic output-topic1 --topic output-topic2
# Allow Streams to manage its own internal topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DELETE --operation WRITE --operation CREATE --resource-pattern-type prefixed --topic team1-streams-app1
# Allow Streams to manage its own consumer groups:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DESCRIBE --group team1-streams-app1
# Allow Streams EOS:
bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --operation DESCRIBE --transactional-id team1-streams-app1 --resource-pattern-type prefixed
重新分区主题配置了无限保留,以防止数据丢失,以防应用程序长时间离线(我们不希望保留启动,并删除未处理的数据)。
为了真正控制主题的大小,Kafka Streams 定期向代理发出“删除记录”请求,以清除已完全处理的旧数据。这个“删除记录”请求需要内部主题的“删除”权限,如果ACL配置不正确,可能会失败,导致无限增长。