我正在使用与包含字符串“fail”的单个事件匹配的模式来测试 Flink CEP。我只发送一个事件并期望立即得到结果。在下面的代码中,我使用两种方法来插入水印:WatermarkStrategy 和 BoundedOutOfOrdernessTimestampExtractor。我使用的是 Flink 版本 1.16.1。
我注意到使用 WatermarkStrategy 不会立即触发事件;它通常需要第二个事件才能触发。但是,BoundedOutOfOrdernessTimestampExtractor 按预期触发事件。下面是我的代码:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KeyedStream<Tuple3<Long, String, String>, String> stream = env
.addSource(new SourceFunction<Tuple3<Long, String, String>>() {
@Override
public void run(SourceContext<Tuple3<Long, String, String>> ctx) throws Exception {
while (true) {
ctx.collect(Tuple3.of(1000L, "a", "fail"));
TimeUnit.SECONDS.sleep(1);
// ctx.collect(Tuple3.of(2000L, "a", "fail"));
TimeUnit.HOURS.sleep(1);
}
}
@Override
public void cancel() {
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((SerializableTimestampAssigner<Tuple3<Long, String, String>>) (tuple3, l) -> tuple3.f0
)
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Long, String, String>>(Time.of(0, TimeUnit.SECONDS)) {
@Override
public long extractTimestamp(Tuple3<Long, String, String> element) {
return element.f0;
}
})
.keyBy((KeySelector<Tuple3<Long, String, String>, String>) value -> value.f1);
Pattern<Tuple3<Long, String, String>, Tuple3<Long, String, String>> pattern = Pattern.<Tuple3<Long, String, String>>begin("first")
.where(new SimpleCondition<>() {
@Override
public boolean filter(Tuple3<Long, String, String> tuple3) {
return tuple3.f2.equals("fail");
}
}).within(Time.of(10, TimeUnit.SECONDS));
PatternStream<Tuple3<Long, String, String>> patternStream = CEP.pattern(stream, pattern);
patternStream.select((PatternSelectFunction<Tuple3<Long, String, String>, String>) map -> {
System.out.println("trigger:" + map.values().size());
return "trigger";
}).print("warning");
env.execute();
}
为什么 WatermarkStrategy 无法立即触发事件,而 BoundedOutOfOrdernessTimestampExtractor 却按预期工作?如何使用 WatermarkStrategy 实现即时触发?
对于 BoundedOutOfOrderness 使用 BoundedOutOfOrdernessWatermarks,onPeriodicEmit 返回新 Watermark(maxTimestamp - outOfOrdernessMillis - 1)。 BoundedOutOfOrdernessTimestampExtractor,getCurrentWatermark 返回新 Watermark(lastEmissedWatermark)。 因为水印 - 1