这是一个后续问题:qazxsw poi
我在流中存储每个传入元素的状态,在计时器关闭后,我删除了状态。这样我就可以防止重复处理,直到元素超时,之后我可以再次处理相同的元素。一世
我已经编写了以下代码来测试计时器,但似乎在所有3个元素都经历了第一个Trigger when State expires之后触发了计时器。
ProcessFunction
我有一个包含3个重复元素的输入列表。在第一个public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
streamEnv.setParallelism(12);
List<Tuple2<String, String>> inputList = new ArrayList<>();
inputList.add(new Tuple2<>("Test", "test"));
inputList.add(new Tuple2<>("Test", "test"));
inputList.add(new Tuple2<>("Test", "test"));
streamEnv.fromCollection(inputList).keyBy(0)
.process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
ValueState<Integer> occur;
@Override
public void open(Configuration parameters) throws Exception {
occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0));
}
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
if (occur.value() < 2) {
occur.update(occur.value() + 1);
out.collect(value);
LOGGER.info("[TEST] Outputting Tuple {}", value);
}
else {
Thread.sleep(10000);
LOGGER.info("[TEST] Outputting Tuple {}", value);
out.collect(value);
}
}
})
.keyBy(0)
.process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
ValueState<Tuple2<String, String>> storedTuple;
@Override
public void open(Configuration parameters) throws Exception {
storedTuple = getRuntimeContext().getState(new ValueStateDescriptor<>("storedTuple",
TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})));
}
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
Tuple2<String, String> stored = storedTuple.value();
if (stored == null) {
LOGGER.info("[TEST] Storing Tuple {}", value);
storedTuple.update(value);
out.collect(value);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 6000);
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
LOGGER.info("[TEST] Removing Tuple {}", storedTuple.value());
storedTuple.clear();
}
)
.addSink(new CollectSink());
streamEnv.execute("Testing");
for (Tuple2<String, String> tup: CollectSink.values) {
System.out.println(tup);
}
}
private static class CollectSink implements SinkFunction<Tuple2<String, String>> {
static final List<Tuple2<String, String>> values = new ArrayList<>();
@Override
public synchronized void invoke(Tuple2<String, String> value) throws Exception {
values.add(value);
}
}
中,我按原样发送前两个元素,但将第三个元素延迟10秒。
在第二个ProcessFunction
中,它根据是否为其存储状态来过滤元素。正如所料,第一个元素被存储并向前发送,第二个元素不是状态已经存在。对于第一个元素,除了发送它之外,我还设置了一个6秒的定时器,以便在触发定时器后清除状态。
现在第三个元素在10秒后发送,这意味着6秒触发器应该已经清除了状态。但是,在触发计时器之前也会处理第三个元素。我也可以看到输出只包含1个副本的元组,即使我预计有2个副本。
我添加了一些日志记录,以便更好地了解执行时间。
ProcessFunction
您可以看到前两个元组按预期一起发出,然后是10秒延迟,之后发出第3个元组。现在[2019-02-19 14:11:48,891] [Process (1/12)] INFO FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,891] [Process (1/12)] INFO FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,943] [Process -> Sink: Unnamed (1/12)] INFO FlinkTest - [TEST] Storing Tuple (Test,test)
[2019-02-19 14:11:58,891] [Process (1/12)] INFO FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:58,896] [Process -> Sink: Unnamed (1/12)] INFO FlinkTest - [TEST] Removing Tuple (Test,test)
发生在10秒之后,即使它在第一个元组进入6秒后被触发。
在处理大于计时器中指定的时间的水印之前,事件时间计时器不会触发。直到处理完第三个事件之后才能发生这样的水印。此外,对于摄取时间,使用周期性水印生成器生成水印,并且默认情况下每200毫秒插入流中。