Kafka Streams 主题保留问题:动态主题配置和未发布的数据

问题描述 投票:0回答:1

我在与 Spring Cloud Stream 集成的 Kafka Streams 应用程序方面遇到问题。问题围绕着一个特定的主题,它的大小不断增加并且从不释放数据:

  • ApplicationId-MergeUndoMyObject-KSTREAM-KEY-SELECT-0000000003-重新分区

本主题的保留配置为-1。

生成此重新分区的实现是由以下代码创建的:

    @Bean
    public BiFunction<KStream<String, Undo>, KStream<String, MyObjectUpdate>, KStream<String, UndoMyObject>> mergeUndoWithMyObjectUpdate() {
        return (undo, myObjectUpdate) ->
                undo.selectKey((k, v) -> v.getMyObjectDesignator())
                        .join(myObjectUpdate.selectKey((k, v) -> (v.getMyObject() == null ? null : v.getMyObject().getDesignator())),
                                this::execute,
                                JoinWindows.of(Duration.ofSeconds(3)),
                                StreamJoined.with(Serdes.String(),
                                        new JsonSerde<>(Undo.class),
                                        new JsonSerde<>(MyObjectUpdate.class)))
                        .filterNot((k, v) -> v == null)
                        .map((k, v) -> new KeyValue<>(v.getUndo().getId(), v))
                        .peek((k, v) -> {
                                    log.debug("Merged Undo with MyObjectUpdate -> {}, {}", v.getUndo().getId(), v.getAfterMyObject().getId());
                                    if (repository.getByKey(v.getUndo().getId()).isEmpty()) {
                                        undoStompController.send(v.getUndo().getId(), v.getChanges(), v.getUndo().getSessionId());
                                    }
                                }
                        );
    }

我最初的反应只是在内部主题中设置保留。

但我注意到它没有使用默认配置,而是被标记为 DYNAMIC_TOPIC_CONFIG(在 AKHQ 中)。经过一番研究后,我发现评论建议不要直接设置这些值并让 Kafka 自行处理保留。

我试图理解:

  1. 尽管计数为零,但可能导致主题不发布数据的原因是什么?
  2. 是否建议手动设置该主题的保留时间?

这是 AKHQ 的屏幕截图,显示了主题信息(我编辑了主题和消费者组名称):

感谢您为解决此 Kafka Streams 问题提供的任何见解或帮助。任何其他信息或建议将不胜感激。

apache-kafka apache-kafka-streams spring-cloud-stream
1个回答
0
投票

听起来像是权限问题?您需要为 Kafka Streams 提供正确的 ACL,特别是内部主题的“删除”权限:

来自 https://docs.confluence.io/platform/current/streams/developer-guide/security.html

# Allow Streams to read the input topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --topic input-topic1 --topic input-topic2

# Allow Streams to write to the output topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --topic output-topic1 --topic output-topic2

# Allow Streams to manage its own internal topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DELETE --operation WRITE --operation CREATE --resource-pattern-type prefixed --topic team1-streams-app1

# Allow Streams to manage its own consumer groups:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DESCRIBE --group team1-streams-app1

# Allow Streams EOS:
bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --operation DESCRIBE --transactional-id team1-streams-app1 --resource-pattern-type prefixed

重新分区主题配置了无限保留,以防止数据丢失,以防应用程序长时间离线(我们不希望保留启动,并删除未处理的数据)。

为了真正控制主题的大小,Kafka Streams 定期向代理发出“删除记录”请求,以清除已完全处理的旧数据。这个“删除记录”请求需要内部主题的“删除”权限,如果ACL配置不正确,可能会失败,导致无限增长。

© www.soinside.com 2019 - 2024. All rights reserved.