Apache Flink:在一段时间内未收到数据时如何关闭固定大小的窗口

问题描述 投票:0回答:2

我正在尝试根据事件时间计算 Kafka 主题每分钟传入事件的速率。我为此使用了 1 分钟的

TumblingEventTimeWindows
。代码片段如下。我观察到,如果我没有收到特定窗口的任何事件,例如从2.34到2.35,那么之前的2.33到2.34的窗口就不会接近。我了解 2.33 到 2.34 窗口丢失数据的风险(可能由于系统故障、较大的 Kafka 滞后等而发生),但我不能无限期地等待。我需要等待一定时间后关闭这个窗口,后续的窗口可以在系统恢复后继续。我怎样才能实现这个目标?

我正在尝试以下代码,该代码给出连续事件流的每分钟事件计数。

    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3,
            org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
    ));
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    executionEnvironment.setParallelism(1);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "AllEventCountConsumerGroup");
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
    DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
    kafkaDataStream
            .flatMap(new EventFlatter())
            .filter(Objects::nonNull)
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    .<Entity>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
            .keyBy((KeySelector<Entity, String>) Entity::getTenant)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(10))
            .aggregate(new EventCountAggregator())
            .addSink(eventRateProducer);
java apache-kafka apache-flink
2个回答
2
投票

给定

forBoundedOutOfOrderness(Duration.ofSeconds(2))
,间隔
[t, t + 1 minute)
的窗口只有在处理具有
timestamp >= t + 1 minute + 2 seconds
的事件之后才会关闭。

如果您的输入流可能长时间处于空闲状态,并且您等不及流恢复,那么您必须在检测到空闲状态后人为地提前水印,或者使用使用组合的自定义窗口

Trigger
事件时间和处理时间计时器。

对于检测空闲状态的水印生成器,这里有一个示例,但它尚未更新到新的

WatermarkStrategy
API。


0
投票

基于@david-anderson的建议。我实现了自定义水印策略,以在检测到空闲后推进水印。

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.util.Preconditions;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;

public class CustomBoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    private long maxTimestamp;
    private final long outOfOrdernessMillis;
    private long lastMaxTimestampCalculatedAt;
    private long sourceMaxIdlenessMillis;

    private long lastEmittedWatermark = Long.MIN_VALUE;

    public CustomBoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness, Duration sourceIdleness) {
        Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        Preconditions.checkNotNull(sourceIdleness, "sourceIdleness");
        Preconditions.checkArgument(!sourceIdleness.isNegative(), "sourceIdleness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        this.sourceMaxIdlenessMillis = sourceIdleness.toMillis();
        this.maxTimestamp = Long.MIN_VALUE + this.outOfOrdernessMillis + 1L;
    }

    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        if(eventTimestamp > maxTimestamp) {
            maxTimestamp = eventTimestamp;
        }
        lastMaxTimestampCalculatedAt = System.currentTimeMillis();
    }

    public void onPeriodicEmit(WatermarkOutput output) {
        long potentialWM = maxTimestamp - outOfOrdernessMillis - 1L;

        if(potentialWM > lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        } else if ((System.currentTimeMillis() - lastMaxTimestampCalculatedAt) > sourceMaxIdlenessMillis) {
            // Truncate maxTimestamp till seconds & trail watermark by outOfOrdernessMillis & extra 1milliSecond to close the windows
            potentialWM = ((maxTimestamp / 10000) * 10000) + outOfOrdernessMillis + 1L;
            lastEmittedWatermark = Math.max(lastEmittedWatermark,  potentialWM);
            System.out.println("onPeriodicEmit::lastEmittedWatermark:trailed "+ Instant.ofEpochMilli(lastEmittedWatermark).atZone(ZoneOffset.UTC));
        }
        output.emitWatermark(new Watermark(this.lastEmittedWatermark));
    }
}

用途:

WatermarkStrategy<MyEvent> wm2Strategy = (ctx -> new CustomBoundedOutOfOrdernessWatermarks<IBedRec>(Duration.ofSeconds(10), Duration.ofSeconds(20)));
wm2Strategy = wm2Strategy
            .withTimestampAssigner((event, timestamp) -> event.getEventTimestampMillis())
            .withIdleness(Duration.ofSeconds(20));


DataStream<IBedRec> ibedRecords = env.fromSource(iBedRecSource, wm2Strategy,"source");

@david-anderson 如果您有任何改进建议,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.