Kafka Stream 抑制会话窗口聚合

问题描述 投票:0回答:3

我已经在 Kafka 流应用程序中编写了这段代码:

KGroupedStream<String, foo> groupedStream = stream.groupByKey();
groupedStream.windowedBy(
SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3)))
    .aggregate(() -> {...})
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .toStream()...

(如果我理解正确的话)应该在窗口关闭后为每个键发出记录。 不知何故,行为如下:

流不会发出第一条记录,即使使用不同的密钥,也只会在第二条记录之后转发它,然后第二条记录仅在第三条记录之后发出,依此类推..

我已经尝试了多个带有“exactly_once”的 StreamConfig,并且无论有或没有缓存,这种行为仍然存在。

预先感谢您的帮助!

apache-kafka apache-kafka-streams window-functions suppress
3个回答
20
投票

这是预期的行为。请注意,

suppress()
基于事件时间。因此,只要没有新数据到达,时间就无法前进,因此提前驱逐记录将是错误的,因为不能保证下一条记录可能属于当前窗口。


0
投票

如果您的流没有连续接收数据,并且可能有一段时间不活动,但您仍然希望会话窗口聚合正常工作,您可以实现标点符号来生成心跳事件。

这是一个示例代码:

@Slf4j
public class HeartbeatProcessor implements Processor<String, Long, String, Long> {
    private ProcessorContext<String, Long> context;
    private static final AtomicLong lastExecutionTime = new AtomicLong(0);
    @Override
    public void init(ProcessorContext<String, Long> context) {
        this.context = context;
        context.schedule(
            Duration.ofSeconds(1),
            PunctuationType.WALL_CLOCK_TIME,
            timestamp -> {
                long currentTime = System.currentTimeMillis();
                long lastTime = lastExecutionTime.get();
                if (currentTime - lastTime >= 1000) {
                    if (lastExecutionTime.compareAndSet(lastTime, currentTime)) {
                        context.forward(new Record<>("heartbeat", currentTime, timestamp));
                    }
                }
            }
        );
    }
    @Override
    public void process(Record<String, Long> record) {
        context.forward(record);
    }
}

将此处理器添加到您的输入 KStream:

inputKStream.process(HeartbeatProcessor::new)
    .yourNextMethod()
    ...

重要提示: 如果您的 Kafka Streams 应用程序在多个实例上运行,则上述操作可能会同时发生多次。 为了解决这个问题,您可能需要考虑一些技术,例如根据分区编号设置不同的标点符号间隔。


-1
投票

我认为带有“suppress()”的“Session Window”不会生成任何输出。

如有错误请指正。据我所知,suppress() 仅适用于基于时间的 Windows,不适用于基于会话的 Windows。

© www.soinside.com 2019 - 2024. All rights reserved.