在Flink任务实例内部,我需要在事件发生时访问远程Web服务以获取一些数据,但是我不想每次事件发生时都访问远程Web服务,因此我需要将数据缓存在本地内存中,可以访问所有任务的过程,该怎么做?将数据存储在类级别的静态私有变量中?
例如下面的示例,如果将本地变量localCache设置为Splitter类,则将其缓存在操作员级别而不是进程级别。
公共类WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
***private object localCache ;***
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
就像你说的那样。您将在RichFlatMapFunction
中使用静态变量,然后在open
中对其进行初始化。在馈送任何记录之前,将在每个TaskManager上调用open
。请注意,有一个为每个不同插槽创建的Splitter实例,因此在大多数情况下,一个TaskManager上有多个Splitter实例。因此,您需要防止双重创建。