如何从Kafka全局状态存储中删除记录?

问题描述 投票:0回答:1

全局状态存储在恢复过程中,会将源主题(全局存储的变化日志主题)的数据转储。

删除一条记录时,我的操作如下

kvStore.put("key-1",null)

Kafka如何知道该记录已被删除,并在恢复时从源主题中转储记录(考虑到源主题有一个key-1的记录)。

在我的拓扑结构中,我有

  • 输入主题 -> T1
  • 并附加一个过程,从T1读取数据,并从记录中构建一个密钥,并向下转发到主题T2。
  • 和话题T2是全局状态存储的源话题。

例如

  • T1我得到了数据。{"id":'123', "name":"Mohit", "type":"insert"}
  • 构造一个记录的键,并转发Down到主题,键和值为T2 -> 键: 123 和值。{"id":'123', "name":"Mohit"}

同样的key记录在data.T1得到数据后,以删除类型出现。{"id":'123', "name":"Mohit", "type":"insert"}

所以我把记录转发成这样

this.context.forward(key, null)
key: 123 value:null

相同的是,在国家商店更新

我只是想知道,在恢复过程中,这个记录将被删除,意味着我得到空的,如果我去上存储与键 123.

apache-kafka kafka-consumer-api apache-kafka-streams kafka-producer-api
1个回答
1
投票

状态存储的变更记录是 紧凑 主题。要从压缩的主题中删除信息,你需要做一个 put(key, null) 操作。一个带有 null 值称为 墓碑 它将 最终 被主题清理器删除。

注意,该消息只会在状态存储中被删除(最终),而不会在输入主题中被删除。

最后,键为 123 应从国库中彻底清除。

© www.soinside.com 2019 - 2024. All rights reserved.