Flink计时器未按时执行

问题描述 投票:0回答:1

这是一个后续问题:qazxsw poi

我在流中存储每个传入元素的状态,在计时器关闭后,我删除了状态。这样我就可以防止重复处理,直到元素超时,之后我可以再次处理相同的元素。一世

我已经编写了以下代码来测试计时器,但似乎在所有3个元素都经历了第一个Trigger when State expires之后触发了计时器。

ProcessFunction

我有一个包含3个重复元素的输入列表。在第一个public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); streamEnv.setParallelism(12); List<Tuple2<String, String>> inputList = new ArrayList<>(); inputList.add(new Tuple2<>("Test", "test")); inputList.add(new Tuple2<>("Test", "test")); inputList.add(new Tuple2<>("Test", "test")); streamEnv.fromCollection(inputList).keyBy(0) .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() { ValueState<Integer> occur; @Override public void open(Configuration parameters) throws Exception { occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0)); } @Override public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { if (occur.value() < 2) { occur.update(occur.value() + 1); out.collect(value); LOGGER.info("[TEST] Outputting Tuple {}", value); } else { Thread.sleep(10000); LOGGER.info("[TEST] Outputting Tuple {}", value); out.collect(value); } } }) .keyBy(0) .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() { ValueState<Tuple2<String, String>> storedTuple; @Override public void open(Configuration parameters) throws Exception { storedTuple = getRuntimeContext().getState(new ValueStateDescriptor<>("storedTuple", TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}))); } @Override public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { Tuple2<String, String> stored = storedTuple.value(); if (stored == null) { LOGGER.info("[TEST] Storing Tuple {}", value); storedTuple.update(value); out.collect(value); ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 6000); } } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception { LOGGER.info("[TEST] Removing Tuple {}", storedTuple.value()); storedTuple.clear(); } ) .addSink(new CollectSink()); streamEnv.execute("Testing"); for (Tuple2<String, String> tup: CollectSink.values) { System.out.println(tup); } } private static class CollectSink implements SinkFunction<Tuple2<String, String>> { static final List<Tuple2<String, String>> values = new ArrayList<>(); @Override public synchronized void invoke(Tuple2<String, String> value) throws Exception { values.add(value); } } 中,我按原样发送前两个元素,但将第三个元素延迟10秒。

在第二个ProcessFunction中,它根据是否为其存储状态来过滤元素。正如所料,第一个元素被存储并向前发送,第二个元素不是状态已经存在。对于第一个元素,除了发送它之外,我还设置了一个6秒的定时器,以便在触发定时器后清除状态。

现在第三个元素在10秒后发送,这意味着6秒触发器应该已经清除了状态。但是,在触发计时器之前也会处理第三个元素。我也可以看到输出只包含1个副本的元组,即使我预计有2个副本。

我添加了一些日志记录,以便更好地了解执行时间。

ProcessFunction

您可以看到前两个元组按预期一起发出,然后是10秒延迟,之后发出第3个元组。现在[2019-02-19 14:11:48,891] [Process (1/12)] INFO FlinkTest - [TEST] Outputting Tuple (Test,test) [2019-02-19 14:11:48,891] [Process (1/12)] INFO FlinkTest - [TEST] Outputting Tuple (Test,test) [2019-02-19 14:11:48,943] [Process -> Sink: Unnamed (1/12)] INFO FlinkTest - [TEST] Storing Tuple (Test,test) [2019-02-19 14:11:58,891] [Process (1/12)] INFO FlinkTest - [TEST] Outputting Tuple (Test,test) [2019-02-19 14:11:58,896] [Process -> Sink: Unnamed (1/12)] INFO FlinkTest - [TEST] Removing Tuple (Test,test) 发生在10秒之后,即使它在第一个元组进入6秒后被触发。

java apache-flink flink-streaming
1个回答
2
投票

在处理大于计时器中指定的时间的水印之前,事件时间计时器不会触发。直到处理完第三个事件之后才能发生这样的水印。此外,对于摄取时间,使用周期性水印生成器生成水印,并且默认情况下每200毫秒插入流中。

© www.soinside.com 2019 - 2024. All rights reserved.