我正在努力解决 kafkastream 聚合背后的逻辑。 我有 A_B_C 形式的字符串键和基本上是 int 值的值的记录,我想通过获取组中值的最大值,然后从这些最大值中获取不同值来逐步聚合键。我将在下面留下一个例子,希望有助于理解我的需要。 真正不理解的是聚合器的逻辑。我理解允许更新最终输出的底层状态存储的概念(在 KGroupedTable 的情况下),但我不明白在分组之前更改密钥(基本上被截断)的情况下它的行为方式。我希望下面的例子有助于澄清我的意思。 我想知道的是这件事的可行性,如果有人已经运行过类似的事情,解决方案是什么
在一个主题中,我有这样的记录,键是唯一的,因为该主题是压缩的并且首先作为表加载。
键 | 价值 |
---|---|
A_B_C | 1 |
A_B_D | 1 |
A_B_E | 3 |
A_G_F | 2 |
A_L_M | 1 |
我希望第一个聚合获得每个子键的最大值
键 | 价值 |
---|---|
A_B | 3 |
A_G | 2 |
A_L | 1 |
然后获取按最终子键分组的不同值
键 | 价值 |
---|---|
A | (1,2,3) |
此外,我收到墓碑事件,因此以我收到的情况为例
键 | 价值 |
---|---|
A_B_E | 空 |
考虑到上述所有其他记录,聚合应变为
键 | 价值 |
---|---|
A_B | 2 |
A_G | 2 |
A_L | 1 |
最后
键 | 价值 |
---|---|
A | (1,2) |
编辑:表格格式
您尝试做的事情当然是可能的,但是,您需要考虑一些事情。
您想要计算的
max
聚合不是“可减的”,即,给定当前最大值,如果删除新值,您不能只计算新的最大值,而是需要进入的所有值max
功能。Kafka Streams 中的聚合,仅存储当前结果,并在“添加新值”和“删除旧值”时更新结果。例如,
sum
,这效果很好。如果当前总和是 10,加上 2,则变为 12,如果除去 2,则可以计算 8 作为新总和。但是,对于
max
,如果当前最大值为 10,并且删除了 10,则无法计算新的最大值(如果添加 8,您将能够计算新的最大值,因为 10 仍然是最大值,或者如果添加 12 ,因为 12 将成为新的最大值)。因此,您需要进行“两步”聚合:第一步,将“收集”所有值(例如,在列表中——Kafka Streams 已经提供了您可以使用的
ListSerdes
),第二步需要用于计算
max
的列表。这样,当删除某个值时,您可以将其从列表中删除,第二步可以使用更新的列表作为输入来计算新的最大值:
KTable<String, Integer> input = builder.table(...);
KTable<String, List<Integer>> valueList =
input.groupBy(/* set first sub-key */)
.aggregate(/* maintain a list of values */);
// for the remove step, just scan the list,
// and remove the first value which matches the removed value
KTable<String, Integer> max =
valueList.mapValue(/* scan the list, find the max, and return it*/);
对于不同的步骤,您将需要执行相同的操作。首先收集所有值(包括重复项)的列表,然后使用 mapValues()
删除重复项以获得最终结果。注意:在
List
中保留重复项对于计算正确的结果在这两个步骤中都很重要。