Flink MapState在TTL清除过程中清除映射键的所有值

问题描述 投票:0回答:1

我要求每个键的最后25秒值都保持在flink映射状态,但是TTL会在25秒后立即删除所有值。请参阅我的代码,在代码列表中每秒存储每个传感器ID的传入数据,以减少内存存储,我必须在列表中仅保留25秒的数据,有什么方法可以实现? TTL清除整个列表。

public class ContinousDataProcessor
    extends   KeyedProcessFunction<String,SensorData,Tuple2<String,Integer>> {

private transient MapState<String, List<SensorData>> SensorValueMapState;
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration config) {
    MapStateDescriptor<String, List<SensorData>> varibaleTagValueMapDescriptor = new MapStateDescriptor(
            "variableTagValueMapState", String.class, SensorData.class);
    StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(25))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();

    varibaleTagValueMapDescriptor.enableTimeToLive(ttlConfig);
    SensorValueMapState= getRuntimeContext().getMapState(varibaleTagValueMapDescriptor);

}

@Override
public void processElement(SensorData inputData, Context arg1, Collector arg2) throws Exception {


    if (SensorValueMapState.contains(inputData.sensorId)) {
        SensorValueMapState.get(inputData.sensorId).add(inputData);
    } else {
        List<SensorData> sensorDataList = new ArrayList<>();
        sensorDataList.add(inputData);
        SensorValueMapState.put(inputData.sensorId, sensorDataList);
    }

            for (SensorData str : SensorValueMapState.get(inputData.sensorId)) {

        System.out.println(str.eventTime);          
    }}
apache-flink flink-streaming
1个回答
0
投票

据我了解,您希望对列表中的每个元素应用TTL。该列表在您的情况下是处于地图状态的值。地图状态对地图状态下的用户价值结构没有任何了解。这是状态后端中数据布局的限制。因此,在当前实现中无法对每个元素应用TTL。

在值状态下,每个用户值应用TTL;在列表状态下,每个用户元素应用TTL;在映射状态下,每个用户键/值对应用TTL。

根据您的应用程序的要求,您可以尝试使用组合键列出状态:

key of KeyedProcessFunction = current key of your KeyedProcessFunction + your current map state key

这无法通过KeyedProcessFunction的当前键轻松获取所有列表,尽管您现在可以这样做。

© www.soinside.com 2019 - 2024. All rights reserved.