考虑这个 KStream:
KStream<String, String> inputStream = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String()));
Materialized<String, List<String>, WindowStore<Bytes, byte[]>> with = Materialized.with(Serdes.String(), STRING_LIST_SERDE);
KStream<Windowed<String>, List<String>> outputStream = inputStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2)))
.aggregate(
ArrayList::new,
(key, string, aggregate) -> {
aggregate.add(string);
return aggregate;
}, with)
.toStream();
outputStream
将聚合来自 inputStream
info 的所有消息,定义时间范围内的消息数组。
另外,现在我想将消息聚合到特定限制,例如直到列表大小不超过 50 条。
如果列表在聚合过程中变得大于 50,我想以某种方式将其拆分为附加列表。
基本上,我希望实现的输出是获取一组消息,其大小达到限制(例如 50 条),并且达到特定的时间范围,以先到者为准。
为了实现这一目标,我在这里缺少什么?
您可以尝试将
KTable
转换为 KStream
并执行 flatMapValues
来拆分列表,如下所示(Kotlin 中的代码):
val s = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String()))
val output = s
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2)))
.aggregate({ mutableListOf() }, { k: String, str: String, agg: List<String> -> agg.plus(str) })
.toStream()
.flatMapValues { strList -> strList.chunked(50) }
但这意味着您将整个聚合列表加载到内存中 - 可能会也可能不会是一个问题,具体取决于列表大小和您的内存设置,但这绝对是需要记住的事情。