Flink 不要用 EventTimeWindows 关闭窗口

问题描述 投票:0回答:2

为什么这段代码不给出任何东西? 如果我更改为

TumblingProcessingTimeWindows
- 一切正常。

我没有在文档中找到我必须添加的其他内容?触发器?驱逐者? 允许迟到?

  WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> Timestamp.valueOf(i.dt).getTime());

        ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((acc, i) -> {
                    acc.count += i.count;
                    acc.dt = i.dt;
                    return acc;
                }).addSink(new PrintSinkFunction());

输入:

{"userId":1,"count":11,"dt":"2023-04-11T09:29:12.244"}

系统时间=输入时间

更新二:

  1. 我在

    withTimestampAssigner
    中添加了一些打印信息 - 它在每个事件中都被调用。

  2. 我为捕获丢失的事件添加了 OutputTag - 它很清楚。

    OutputTag lateTag = new OutputTag("late"){};

  3. 我在内部添加了调试打印以减少功能 - 它会在每个事件上调用。

但是 print (sink) for close output window 没有 =(

所有代码:

 private static void m4(DataStream<UserModel> ds) {
        WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> {
                    long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    System.out.println(i.dt + " is: " + time +  " dont know: " + timestamp);
                    return time;
                });

        OutputTag<UserModel> lateTag = new OutputTag<UserModel>("late"){};

        SingleOutputStreamOperator<UserModel> reduce = ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .sideOutputLateData(lateTag)
                .reduce((acc, i) -> {
                    System.out.println(i.dt + " reDUCE:");
                    acc.count += i.count;
                    acc.dt = i.dt;
                    return acc;
                });
                reduce.getSideOutput(lateTag).print();

                reduce.addSink(new PrintSinkFunction());


    }

更新 3:

回复@kkrugler。 有趣的是

TumblingEventTimeWindows
需要
ProcessAllWindowFunction
,虽然
TumblingProcessingTimeWindows
没有它也能工作.

所以我添加了

ProcessAllWindowFunction
,但是还是没有结果,我添加了打印来调试这部分代码是not called.

有趣的一点,如果我更改为

TumblingProcessingTimeWindows
,关闭窗户和sink即使没有
ProcessAllWindowFunction

所有代码:

public class Rich extends ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow> {
    @Override
    public void process(ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow>.Context context, Iterable<UserModel> iterable, Collector<UserModelEx> collector) throws Exception {
        UserModel um = iterable.iterator().next();
        System.out.println(um.count + " rich:" + um.dt);
        collector.collect(new UserModelEx() {{
            userId = um.userId;
            count = um.count;
            wStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getStart()), ZoneOffset.UTC);
            wEnd = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getEnd()), ZoneOffset.UTC);
        }});
    }
}

 private static void m4(DataStream<UserModel> ds) {

        WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> {
                    long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    System.out.println(i.dt + " assignEvent: " + time + " : " + timestamp);
                    return time;
                });


        SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce((acc, i) -> {
                    acc.count += i.count;
                    acc.dt = i.dt;
                    System.out.println(acc.dt + " reduce:" + acc.count);
                    return acc;
                }, new Rich());

        reduce.print();

        //reduce.addSink(new PrintSinkFunction<UserModelEx>());


    }
apache-flink flink-streaming flink-sql
2个回答
0
投票

所以 4 天后我发现了问题,但我无法解释为什么有必要。

env.setParallelism(1);
- 解决了我的问题。

我正在从 Kafka 读取主题分区 = 1,即默认情况下并行度应为 1。

希望高手解释一下为什么我的例子需要这个参数...


-1
投票

通常你会提供一个带有 reduce 调用的 ProcessAllWindowFunction,见这个 APIProcessAllWindowFunction.process 方法将为每个窗口调用,迭代器提供单个元素(所有 reduce 调用的结果)。

© www.soinside.com 2019 - 2024. All rights reserved.