Flink 1.16.1 中 WatermarkStrategy 立即触发 CEP 事件问题

问题描述 投票:0回答:1

我正在使用与包含字符串“fail”的单个事件匹配的模式来测试 Flink CEP。我只发送一个事件并期望立即得到结果。在下面的代码中,我使用两种方法来插入水印:WatermarkStrategy 和 BoundedOutOfOrdernessTimestampExtractor。我使用的是 Flink 版本 1.16.1。

我注意到使用 WatermarkStrategy 不会立即触发事件;它通常需要第二个事件才能触发。但是,BoundedOutOfOrdernessTimestampExtractor 按预期触发事件。下面是我的代码:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        KeyedStream<Tuple3<Long, String, String>, String> stream = env
                .addSource(new SourceFunction<Tuple3<Long, String, String>>() {
                    @Override
                    public void run(SourceContext<Tuple3<Long, String, String>> ctx) throws Exception {
                        while (true) {
                            ctx.collect(Tuple3.of(1000L, "a", "fail"));
                            TimeUnit.SECONDS.sleep(1);
//                            ctx.collect(Tuple3.of(2000L, "a", "fail"));
                            TimeUnit.HOURS.sleep(1);
                        }
                    }

                    @Override
                    public void cancel() {
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                .withTimestampAssigner((SerializableTimestampAssigner<Tuple3<Long, String, String>>) (tuple3, l) -> tuple3.f0
                                )
                )
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Long, String, String>>(Time.of(0, TimeUnit.SECONDS)) {
                    @Override
                    public long extractTimestamp(Tuple3<Long, String, String> element) {
                        return element.f0;
                    }
                })

                .keyBy((KeySelector<Tuple3<Long, String, String>, String>) value -> value.f1);

        Pattern<Tuple3<Long, String, String>, Tuple3<Long, String, String>> pattern = Pattern.<Tuple3<Long, String, String>>begin("first")
                .where(new SimpleCondition<>() {
                    @Override
                    public boolean filter(Tuple3<Long, String, String> tuple3) {
                        return tuple3.f2.equals("fail");
                    }
                }).within(Time.of(10, TimeUnit.SECONDS));


        PatternStream<Tuple3<Long, String, String>> patternStream = CEP.pattern(stream, pattern);
        patternStream.select((PatternSelectFunction<Tuple3<Long, String, String>, String>) map -> {
            System.out.println("trigger:" + map.values().size());
            return "trigger";
        }).print("warning");
        env.execute();
    }

为什么 WatermarkStrategy 无法立即触发事件,而 BoundedOutOfOrdernessTimestampExtractor 却按预期工作?如何使用 WatermarkStrategy 实现即时触发?

apache-flink flink-cep
1个回答
0
投票

对于 BoundedOutOfOrderness 使用 BoundedOutOfOrdernessWatermarks,onPeriodicEmit 返回新 Watermark(maxTimestamp - outOfOrdernessMillis - 1)。 BoundedOutOfOrdernessTimestampExtractor,getCurrentWatermark 返回新 Watermark(lastEmissedWatermark)。 因为水印 - 1

© www.soinside.com 2019 - 2024. All rights reserved.