我正在开发一个 Flink 作业,该作业处理
Row
元素并使用 KeyedProcessFunction
应用延迟。问题在于如何在 keyBy
函数中生成密钥。
当我使用
System.currentTimeMillis()
生成 keyBy
中的密钥时,在 NullPointerException
中向 ListState
添加元素时会得到 bufferedElements.add()
。具体来说,setCurrentKeyGroupIndex
函数计算出的密钥组索引为78,超出了为任务分配的密钥组范围(52-76),从而导致NPE。
但是,当我使用
Row
中的字段(也包含 System.currentTimeMillis()
值)作为键时,问题不会发生,并且状态管理工作正常。
这是我的代码的简化版本:
public static void main(String[] args) {
// Define row types
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.LONG},
new String[]{"src_ip", "port", "time", "time2"});
// Set up the environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(5);
// Source function
environment.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 1000; i++) {
ctx.collect(Row.of("first_" + 1, "456", System.currentTimeMillis(), System.currentTimeMillis()));
Thread.sleep(50);
}
}
@Override
public void cancel() {
}
}).returns(rowTypeInfo)
.keyBy(new KeySelector<Row, String>() {
@Override
public String getKey(Row value) throws Exception {
// Causes NPE
return String.valueOf(System.currentTimeMillis());
// Works fine
// return String.valueOf(value.getField(3));
}
})
.process(new DelayFunction(5000))
.map(new MapFunction<Row, Row>() {
@Override
public Row map(Row value) throws Exception {
return value;
}
});
try {
environment.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Delay function
public static class DelayFunction extends KeyedProcessFunction<String, Row, Row> {
private final long delayTime;
private transient ListState<Row> bufferedElements;
public DelayFunction(long delayTime) {
this.delayTime = delayTime;
}
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<Row> descriptor = new ListStateDescriptor<>("bufferedElements", Row.class);
bufferedElements = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
long currentTime = ctx.timerService().currentProcessingTime();
long triggerTime = currentTime + delayTime;
value.setField(0, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(triggerTime));
bufferedElements.add(value); // NPE occurs here
ctx.timerService().registerProcessingTimeTimer(triggerTime);
}
}
使用
keyBy
的System.currentTimeMillis()
函数导致setCurrentKeyGroupIndex
计算78个关键组,但我的任务只负责关键组52-76,这似乎会在添加到NullPointerException
时导致bufferedElements
.
为什么
System.currentTimeMillis()
会出现此问题,但当我使用 Row
中的字段(也包含 System.currentTimeMillis()
值)时却不会出现此问题?
任何见解将不胜感激!
使用 Flink,密钥不会进行通信/共享,而是在需要时通过调用 KeySelector 函数(共享)来计算。 Flink 要求计算键的函数是确定性的,这意味着作业的所有并行实例都必须为每个键计算相同的值,无论何时计算其值。如果没有这个保证,Flink 将会失败,而且可能会以不可预测的方式失败。
这意味着密钥不能依赖于随机的东西或随时间变化的东西,也不能是从一个 JVM 到另一个 JVM 不同的东西,比如枚举。