所以我尝试使用未加密的 processFunctions 来创建自己的窗口方案。我正在使用来源并想使用水印。我目前的水印实现如下
this.watermarkStrategy = WatermarkStrategy
.<EventBasic>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
我创建了我的源代码如下
DataStream<EventBasic> mainStream = env.readTextFile(csvFilePath)
.map(new MapFunction<String, EventBasic>() {
@Override
public EventBasic map(String line) throws Exception {
String[] parts = line.split(",");
if (parts.length == 3) {
String key = parts[0];
int valueInt = Integer.parseInt(parts[1]);
long valueTimeStamp = Long.parseLong(parts[2]);
return new EventBasic(key, valueInt, valueTimeStamp);
} else {
return null;
}
}
}).setParallelism(3).assignTimestampsAndWatermarks(watermarkStrategy).name("source");
此源函数读取具有以下格式的 CSV 文件:
key,val,timestamp
A,0,500
C,1,500
A,2,500
A,3,500
A,4,500
B,5,500
A,6,500
H,7,500
...
a,100,1500
时间戳单调增加
立即观察时(我创建了一个虚拟过程函数来观察我的时间戳是否正常工作),我不断观察到值
-9223372036854775808
。这意味着水印生成不知道何时添加新水印。
我还尝试了以下水印策略,它会产生相同的输出:
this.watermarkStrategy = WatermarkStrategy
.<EventBasic>forBoundedOutOfOrderness(Duration.ofMillis(500))
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
我不知道我的问题是什么,我尝试到处寻找,但似乎没有任何改变。
所以事实证明,为什么水印没有增加,但时间戳工作正常的问题是由于我已将环境设置为
BATCH
模式(env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
),因此对于flink,他们认为由于数据提前知道不需要水印。
src:https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution_mode/
我希望这能帮助任何遇到这个问题的人。