如何在Flink流中在进程级缓存局部变量?

问题描述 投票:0回答:1

在Flink任务实例内部,我需要在事件发生时访问远程Web服务以获取一些数据,但是我不想每次事件发生时都访问远程Web服务,因此我需要将数据缓存在本地内存中,可以访问所有任务的过程,该怎么做?将数据存储在类级别的静态私有变量中?

例如下面的示例,如果将本地变量localCache设置为Splitter类,则将其缓存在操作员级别而不是进程级别。

公共类WindowWordCount {

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

    dataStream.print();

    env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    ***private object localCache ;***

    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

}

apache-flink flink-streaming
1个回答
0
投票

就像你说的那样。您将在RichFlatMapFunction中使用静态变量,然后在open中对其进行初始化。在馈送任何记录之前,将在每个TaskManager上调用open。请注意,有一个为每个不同插槽创建的Splitter实例,因此在大多数情况下,一个TaskManager上有多个Splitter实例。因此,您需要防止双重创建。

© www.soinside.com 2019 - 2024. All rights reserved.