查看在流应用程序的输出中创建的重复记录

问题描述 投票:0回答:1

我有一个 Kafka Streams 应用程序,它从 Kafka 主题获取输入,在 5 分钟窗口内将其聚合到原始值的三个字段上。 在输出方面,我需要将聚合消息(其中键 = 复杂对象,值 = 聚合)转换为一条消息,其中键 = 复杂对象中的字段之一,而值是复杂对象 + 聚合进入一个新的领域。 最后,该输出通过 Iceberg Kafka 连接器保存到 Iceberg 表中。 由于这部分需要翻译,Iceberg 连接器只能持久化值对象而不是键。

当数据到达 Iceberg 连接器时,我看到重复的条目,例如同一时间窗口有 4 个或 6 个值。 我已经排除了迟到消息的原因,因为我已经验证了测试环境中存在这种不一致的输出,所以我确信这是我的代码中的错误。 复制发生在它撞上冰山之前。

对于 Kafka Streams,我是个新手。 我的理解是 RocksDB 用作中间存储,并且在其他 Kafka 主题中管理多个状态存储。 结果是,直到窗口过去(并且宽限期过去)后,最终消息才会写入目标主题。

哪些原因会导致重复输出消息?

apache-kafka apache-kafka-streams apache-kafka-connect apache-iceberg
1个回答
0
投票

使用 Kafka Streams 窗口聚合,默认情况下它将发出中间结果,而不是等到窗口关闭才发出最终结果。为了让 Kafka Streams 发出最终结果,您需要像这样设置 EmitStrategy.onWindowClose()

builder.stream(topic,....))
        .groupByKey(....))
        .windowedBy(TimeWindows.ofSizeAndNoGrace(Duration.ofMinutes(5)))
        .emitStrategy(EmitStrategy.onWindowClose())
        .aggregate(....)
       .....

您可以在调用 KStream.groupByX().windowedBy(..)..

 产生的 
TimeWindowedKStream

对象上设置发出策略

HTH,让我知道进展如何。

© www.soinside.com 2019 - 2024. All rights reserved.