为什么这段代码不给出任何东西? 如果我更改为
TumblingProcessingTimeWindows
- 一切正常。
我没有在文档中找到我必须添加的其他内容?触发器?驱逐者? 允许迟到?
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> Timestamp.valueOf(i.dt).getTime());
ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
return acc;
}).addSink(new PrintSinkFunction());
输入:
{"userId":1,"count":11,"dt":"2023-04-11T09:29:12.244"}
系统时间=输入时间
更新二:
我在
withTimestampAssigner
中添加了一些打印信息 - 它在每个事件中都被调用。
我为捕获丢失的事件添加了 OutputTag - 它很清楚。
OutputTag lateTag = new OutputTag("late"){};
我在内部添加了调试打印以减少功能 - 它会在每个事件上调用。
但是 print (sink) for close output window 没有 =(
所有代码:
private static void m4(DataStream<UserModel> ds) {
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> {
long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
System.out.println(i.dt + " is: " + time + " dont know: " + timestamp);
return time;
});
OutputTag<UserModel> lateTag = new OutputTag<UserModel>("late"){};
SingleOutputStreamOperator<UserModel> reduce = ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.sideOutputLateData(lateTag)
.reduce((acc, i) -> {
System.out.println(i.dt + " reDUCE:");
acc.count += i.count;
acc.dt = i.dt;
return acc;
});
reduce.getSideOutput(lateTag).print();
reduce.addSink(new PrintSinkFunction());
}
更新 3:
回复@kkrugler。 有趣的是
TumblingEventTimeWindows
需要 ProcessAllWindowFunction
,虽然 TumblingProcessingTimeWindows
没有它也能工作.
所以我添加了
ProcessAllWindowFunction
,但是还是没有结果,我添加了打印来调试这部分代码是not called.
有趣的一点,如果我更改为
TumblingProcessingTimeWindows
,关闭窗户和sink即使没有ProcessAllWindowFunction
所有代码:
public class Rich extends ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow>.Context context, Iterable<UserModel> iterable, Collector<UserModelEx> collector) throws Exception {
UserModel um = iterable.iterator().next();
System.out.println(um.count + " rich:" + um.dt);
collector.collect(new UserModelEx() {{
userId = um.userId;
count = um.count;
wStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getStart()), ZoneOffset.UTC);
wEnd = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getEnd()), ZoneOffset.UTC);
}});
}
}
private static void m4(DataStream<UserModel> ds) {
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> {
long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
System.out.println(i.dt + " assignEvent: " + time + " : " + timestamp);
return time;
});
SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
}, new Rich());
reduce.print();
//reduce.addSink(new PrintSinkFunction<UserModelEx>());
}
所以 4 天后我发现了问题,但我无法解释为什么有必要。
env.setParallelism(1);
- 解决了我的问题。
我正在从 Kafka 读取主题分区 = 1,即默认情况下并行度应为 1。
希望高手解释一下为什么我的例子需要这个参数...
通常你会提供一个带有 reduce 调用的 ProcessAllWindowFunction,见这个 API。 ProcessAllWindowFunction.process 方法将为每个窗口调用,迭代器提供单个元素(所有 reduce 调用的结果)。