Flink保持配置状态

问题描述 投票:1回答:1

我有一个用例维护Flink中的配置,而我实际上并不知道如何处理。

假设我有一些配置存储在某个地方,我需要它来进行处理。在Flink作业初始化时,我想加载所有配置。

也可以在Flink作业运行期间修改此配置,因此我必须将此配置的状态保存在内存中,并在需要时进行更新。可从KafkaSource访问配置更新。

所以这就是我所拥有的:

我有一个加载整个配置,保持其状态并将其与我的数据流关联的功能:

public class MyConfiguration extends RichFlatMapFunction<Row, Row>{
    private transient MapState<String, MyConfObject> configuration;

    @Override
    public void open(MyConfiguration config) throws Exception{
        MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
                "configuration",
                BasicTypeInfo.STRING_TYPE_INFO,
                ...
        );
        configuration = getRuntimeContext().getMapState(descriptor);
        configuration.putAll(...);   // Load configuration from somewhere
    }

    @Override
    public void flatMap(Row value, Collector<Row> out) throws Exception {
        MyConfObject conf = configuration.get(...);
        ...               // Associate conf with data
        out.collect(value);
    }
}

而且我的管道看起来像这样:

DataStream<Row> dataStream = ...; // My data stream
DataStream<Map<String, MyConfObject> streamConf = 
     env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates
        .map(...); 

return dataStream
    .assignTimestampsAndWatermarks(...)
    .flatMap(new MyConfiguration())

    ... //Do some processing

    .map(m -> {
        ObjectMapper objectMapper = new ObjectMapper();
        String json = objectMapper.writeValueAsString(m);
        return json.getBytes();
    });

我想要使用配置更新流streamConf更新MyConfiguration平面映射函数内的State变量。我该怎么办?

java apache-flink flink-streaming
1个回答
0
投票
© www.soinside.com 2019 - 2024. All rights reserved.