我有一个 Kafka Streams 应用程序,它从 Kafka 主题获取输入,在 5 分钟窗口内将其聚合到原始值的三个字段上。 在输出方面,我需要将聚合消息(其中键 = 复杂对象,值 = 聚合)转换为一条消息,其中键 = 复杂对象中的字段之一,而值是复杂对象 + 聚合进入一个新的领域。 最后,该输出通过 Iceberg Kafka 连接器保存到 Iceberg 表中。 由于这部分需要翻译,Iceberg 连接器只能持久化值对象而不是键。
当数据到达 Iceberg 连接器时,我看到重复的条目,例如同一时间窗口有 4 个或 6 个值。 我已经排除了迟到消息的原因,因为我已经验证了测试环境中存在这种不一致的输出,所以我确信这是我的代码中的错误。 复制发生在它撞上冰山之前。
对于 Kafka Streams,我是个新手。 我的理解是 RocksDB 用作中间存储,并且在其他 Kafka 主题中管理多个状态存储。 结果是,直到窗口过去(并且宽限期过去)后,最终消息才会写入目标主题。
哪些原因会导致重复输出消息?
使用 Kafka Streams 窗口聚合,默认情况下它将发出中间结果,而不是等到窗口关闭才发出最终结果。为了让 Kafka Streams 发出最终结果,您需要像这样设置 EmitStrategy.onWindowClose():
builder.stream(topic,....))
.groupByKey(....))
.windowedBy(TimeWindows.ofSizeAndNoGrace(Duration.ofMinutes(5)))
.emitStrategy(EmitStrategy.onWindowClose())
.aggregate(....)
.....
您可以在调用 KStream.groupByX().windowedBy(..)..
产生的TimeWindowedKStream 对象上设置发出策略
HTH,让我知道进展如何。