具有逐步限制键的 KafkaStream 聚合

问题描述 投票:0回答:1

我正在努力解决 kafkastream 聚合背后的逻辑。 我有 A_B_C 形式的字符串键和基本上是 int 值的值的记录,我想通过获取组中值的最大值,然后从这些最大值中获取不同值来逐步聚合键。我将在下面留下一个例子,希望有助于理解我的需要。 真正不理解的是聚合器的逻辑。我理解允许更新最终输出的底层状态存储的概念(在 KGroupedTable 的情况下),但我不明白在分组之前更改密钥(基本上被截断)的情况下它的行为方式。我希望下面的例子有助于澄清我的意思。 我想知道的是这件事的可行性,如果有人已经运行过类似的事情,解决方案是什么

在一个主题中,我有这样的记录,键是唯一的,因为该主题是压缩的并且首先作为表加载。

价值
A_B_C 1
A_B_D 1
A_B_E 3
A_G_F 2
A_L_M 1

我希望第一个聚合获得每个子键的最大值

价值
A_B 3
A_G 2
A_L 1

然后获取按最终子键分组的不同值

价值
A (1,2,3)

此外,我收到墓碑事件,因此以我收到的情况为例

价值
A_B_E

考虑到上述所有其他记录,聚合应变为

价值
A_B 2
A_G 2
A_L 1

最后

价值
A (1,2)

编辑:表格格式

java apache-kafka aggregate apache-kafka-streams
1个回答
0
投票

您尝试做的事情当然是可能的,但是,您需要考虑一些事情。

您想要计算的

max
聚合不是“可减的”,即,给定当前最大值,如果删除新值,您不能只计算新的最大值,而是需要进入的所有max
功能。

Kafka Streams 中的聚合,仅存储当前结果,并在“添加新值”和“删除旧值”时更新结果。例如,

sum

,这效果很好。如果当前总和是 10,加上 2,则变为 12,如果除去 2,则可以计算 8 作为新总和。但是,对于 
max
,如果当前最大值为 10,并且删除了 10,则无法计算新的最大值(如果添加 8,您将能够计算新的最大值,因为 10 仍然是最大值,或者如果添加 12 ,因为 12 将成为新的最大值)。

因此,您需要进行“两步”聚合:第一步,将“收集”所有值(例如,在列表中——Kafka Streams 已经提供了您可以使用的

ListSerdes

),第二步需要用于计算 
max
 的列表。这样,当删除某个值时,您可以将其从列表中删除,第二步可以使用更新的列表作为输入来计算新的最大值:

KTable<String, Integer> input = builder.table(...); KTable<String, List<Integer>> valueList = input.groupBy(/* set first sub-key */) .aggregate(/* maintain a list of values */); // for the remove step, just scan the list, // and remove the first value which matches the removed value KTable<String, Integer> max = valueList.mapValue(/* scan the list, find the max, and return it*/);
对于不同的步骤,您将需要执行相同的操作。首先收集所有值(包括重复项)的列表,然后使用 

mapValues()

 删除重复项以获得最终结果。

注意:在

List

 中保留重复项对于计算正确的结果在这两个步骤中都很重要。

© www.soinside.com 2019 - 2024. All rights reserved.