我已经在 Kafka 流应用程序中编写了这段代码:
KGroupedStream<String, foo> groupedStream = stream.groupByKey();
groupedStream.windowedBy(
SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3)))
.aggregate(() -> {...})
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()...
(如果我理解正确的话)应该在窗口关闭后为每个键发出记录。 不知何故,行为如下:
流不会发出第一条记录,即使使用不同的密钥,也只会在第二条记录之后转发它,然后第二条记录仅在第三条记录之后发出,依此类推..
我已经尝试了多个带有“exactly_once”的 StreamConfig,并且无论有或没有缓存,这种行为仍然存在。
预先感谢您的帮助!
这是预期的行为。请注意,
suppress()
基于事件时间。因此,只要没有新数据到达,时间就无法前进,因此提前驱逐记录将是错误的,因为不能保证下一条记录可能属于当前窗口。
如果您的流没有连续接收数据,并且可能有一段时间不活动,但您仍然希望会话窗口聚合正常工作,您可以实现标点符号来生成心跳事件。
这是一个示例代码:
@Slf4j
public class HeartbeatProcessor implements Processor<String, Long, String, Long> {
private ProcessorContext<String, Long> context;
private static final AtomicLong lastExecutionTime = new AtomicLong(0);
@Override
public void init(ProcessorContext<String, Long> context) {
this.context = context;
context.schedule(
Duration.ofSeconds(1),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> {
long currentTime = System.currentTimeMillis();
long lastTime = lastExecutionTime.get();
if (currentTime - lastTime >= 1000) {
if (lastExecutionTime.compareAndSet(lastTime, currentTime)) {
context.forward(new Record<>("heartbeat", currentTime, timestamp));
}
}
}
);
}
@Override
public void process(Record<String, Long> record) {
context.forward(record);
}
}
将此处理器添加到您的输入 KStream:
inputKStream.process(HeartbeatProcessor::new)
.yourNextMethod()
...
重要提示: 如果您的 Kafka Streams 应用程序在多个实例上运行,则上述操作可能会同时发生多次。 为了解决这个问题,您可能需要考虑一些技术,例如根据分区编号设置不同的标点符号间隔。
我认为带有“suppress()”的“Session Window”不会生成任何输出。
如有错误请指正。据我所知,suppress() 仅适用于基于时间的 Windows,不适用于基于会话的 Windows。