KeyedProcessFunction 在 keyBy 中使用 System.currentTimeMillis() 时抛出 NPE

问题描述 投票:0回答:1

我正在开发一个 Flink 作业,该作业处理

Row
元素并使用
KeyedProcessFunction
应用延迟。问题在于如何在
keyBy
函数中生成密钥。

当我使用

System.currentTimeMillis()
生成
keyBy
中的密钥时,在
NullPointerException
中向
ListState
添加元素时会得到
bufferedElements.add()
。具体来说,
setCurrentKeyGroupIndex
函数计算出的密钥组索引为78,超出了为任务分配的密钥组范围(52-76),从而导致NPE。

但是,当我使用

Row
中的字段(也包含
System.currentTimeMillis()
值)作为键时,问题不会发生,并且状态管理工作正常。

这是我的代码的简化版本:

public static void main(String[] args) {
    // Define row types
    RowTypeInfo rowTypeInfo = new RowTypeInfo(
        new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.LONG}, 
        new String[]{"src_ip", "port", "time", "time2"});

    // Set up the environment
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setParallelism(5);

    // Source function
    environment.addSource(new SourceFunction<Row>() {
        @Override
        public void run(SourceContext<Row> ctx) throws Exception {
            for (int i = 0; i < 1000; i++) {
                ctx.collect(Row.of("first_" + 1, "456", System.currentTimeMillis(), System.currentTimeMillis()));
                Thread.sleep(50);
            }
        }

        @Override
        public void cancel() {
        }
    }).returns(rowTypeInfo)
      .keyBy(new KeySelector<Row, String>() {
          @Override
          public String getKey(Row value) throws Exception {
              // Causes NPE
              return String.valueOf(System.currentTimeMillis());
              
              // Works fine
              // return String.valueOf(value.getField(3));
          }
      })
      .process(new DelayFunction(5000))
      .map(new MapFunction<Row, Row>() {
          @Override
          public Row map(Row value) throws Exception {
              return value;
          }
      });

    try {
        environment.execute();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

// Delay function
public static class DelayFunction extends KeyedProcessFunction<String, Row, Row> {
    private final long delayTime;
    private transient ListState<Row> bufferedElements;

    public DelayFunction(long delayTime) {
        this.delayTime = delayTime;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<Row> descriptor = new ListStateDescriptor<>("bufferedElements", Row.class);
        bufferedElements = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
        long currentTime = ctx.timerService().currentProcessingTime();
        long triggerTime = currentTime + delayTime;
        value.setField(0, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(triggerTime));
        bufferedElements.add(value); // NPE occurs here
        ctx.timerService().registerProcessingTimeTimer(triggerTime);
    }
}

使用

keyBy
System.currentTimeMillis()
函数导致
setCurrentKeyGroupIndex
计算78个关键组,但我的任务只负责关键组52-76,这似乎会在添加到
NullPointerException
时导致
bufferedElements
.

为什么

System.currentTimeMillis()
会出现此问题,但当我使用
Row
中的字段(也包含
System.currentTimeMillis()
值)时却不会出现此问题?

任何见解将不胜感激!

enter image description here

enter image description here

apache-flink
1个回答
0
投票

使用 Flink,密钥不会进行通信/共享,而是在需要时通过调用 KeySelector 函数(共享)来计算。 Flink 要求计算键的函数是确定性的,这意味着作业的所有并行实例都必须为每个键计算相同的值,无论何时计算其值。如果没有这个保证,Flink 将会失败,而且可能会以不可预测的方式失败。

这意味着密钥不能依赖于随机的东西或随时间变化的东西,也不能是从一个 JVM 到另一个 JVM 不同的东西,比如枚举。

© www.soinside.com 2019 - 2024. All rights reserved.