如何在特定时间范围内聚合KStream到固定大小的列表?

问题描述 投票:0回答:1

考虑这个 KStream:

KStream<String, String> inputStream = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String()));
Materialized<String, List<String>, WindowStore<Bytes, byte[]>> with = Materialized.with(Serdes.String(), STRING_LIST_SERDE);

KStream<Windowed<String>, List<String>> outputStream = inputStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2)))
            .aggregate(
                    ArrayList::new,
                    (key, string, aggregate) -> {
                        aggregate.add(string);
                        return aggregate;
                    }, with)
            .toStream();

outputStream
将聚合来自
inputStream
info 的所有消息,定义时间范围内的消息数组。 另外,现在我想将消息聚合到特定限制,例如直到列表大小不超过 50 条。 如果列表在聚合过程中变得大于 50,我想以某种方式将其拆分为附加列表。

基本上,我希望实现的输出是获取一组消息,其大小达到限制(例如 50 条),并且达到特定的时间范围,以先到者为准。

为了实现这一目标,我在这里缺少什么?

java spring spring-boot apache-kafka apache-kafka-streams
1个回答
1
投票

您可以尝试将

KTable
转换为
KStream
并执行
flatMapValues
来拆分列表,如下所示(Kotlin 中的代码):

val s = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String()))
val output = s
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2)))
    .aggregate({ mutableListOf() }, { k: String, str: String, agg: List<String> -> agg.plus(str) })
    .toStream()
    .flatMapValues { strList -> strList.chunked(50) }

但这意味着您将整个聚合列表加载到内存中 - 可能会也可能不会是一个问题,具体取决于列表大小和您的内存设置,但这绝对是需要记住的事情。

© www.soinside.com 2019 - 2024. All rights reserved.