flink 中的水印没有增加

问题描述 投票:0回答:1

所以我尝试使用未加密的 processFunctions 来创建自己的窗口方案。我正在使用来源并想使用水印。我目前的水印实现如下

this.watermarkStrategy = WatermarkStrategy
                .<EventBasic>forMonotonousTimestamps()
                .withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);

我创建了我的源代码如下

DataStream<EventBasic> mainStream = env.readTextFile(csvFilePath)
            .map(new MapFunction<String, EventBasic>() {
                @Override
                public EventBasic map(String line) throws Exception {
                    String[] parts = line.split(",");
                    if (parts.length == 3) {
                        String key = parts[0];
                        int valueInt = Integer.parseInt(parts[1]);
                        long valueTimeStamp = Long.parseLong(parts[2]);
                        return new EventBasic(key, valueInt, valueTimeStamp);
                    } else {
                        return null;
                    }
                }
            }).setParallelism(3).assignTimestampsAndWatermarks(watermarkStrategy).name("source");

此源函数读取具有以下格式的 CSV 文件:

key,val,timestamp
A,0,500
C,1,500
A,2,500
A,3,500
A,4,500
B,5,500
A,6,500
H,7,500
...
a,100,1500

时间戳单调增加

立即观察时(我创建了一个虚拟过程函数来观察我的时间戳是否正常工作),我不断观察到值

-9223372036854775808
。这意味着水印生成不知道何时添加新水印。

我还尝试了以下水印策略,它会产生相同的输出:

this.watermarkStrategy = WatermarkStrategy
                .<EventBasic>forBoundedOutOfOrderness(Duration.ofMillis(500))
                .withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);

我不知道我的问题是什么,我尝试到处寻找,但似乎没有任何改变。

java csv apache-flink flink-streaming flink-batch
1个回答
0
投票

所以事实证明,为什么水印没有增加,但时间戳工作正常的问题是由于我已将环境设置为

BATCH
模式(
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
),因此对于flink,他们认为由于数据提前知道不需要水印。

src:https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution_mode/

我希望这能帮助任何遇到这个问题的人。

© www.soinside.com 2019 - 2024. All rights reserved.