我要求每个键的最后25秒值都保持在flink映射状态,但是TTL会在25秒后立即删除所有值。请参阅我的代码,在代码列表中每秒存储每个传感器ID的传入数据,以减少内存存储,我必须在列表中仅保留25秒的数据,有什么方法可以实现? TTL清除整个列表。
public class ContinousDataProcessor
extends KeyedProcessFunction<String,SensorData,Tuple2<String,Integer>> {
private transient MapState<String, List<SensorData>> SensorValueMapState;
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, List<SensorData>> varibaleTagValueMapDescriptor = new MapStateDescriptor(
"variableTagValueMapState", String.class, SensorData.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(25))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
varibaleTagValueMapDescriptor.enableTimeToLive(ttlConfig);
SensorValueMapState= getRuntimeContext().getMapState(varibaleTagValueMapDescriptor);
}
@Override
public void processElement(SensorData inputData, Context arg1, Collector arg2) throws Exception {
if (SensorValueMapState.contains(inputData.sensorId)) {
SensorValueMapState.get(inputData.sensorId).add(inputData);
} else {
List<SensorData> sensorDataList = new ArrayList<>();
sensorDataList.add(inputData);
SensorValueMapState.put(inputData.sensorId, sensorDataList);
}
for (SensorData str : SensorValueMapState.get(inputData.sensorId)) {
System.out.println(str.eventTime);
}}
据我了解,您希望对列表中的每个元素应用TTL。该列表在您的情况下是处于地图状态的值。地图状态对地图状态下的用户价值结构没有任何了解。这是状态后端中数据布局的限制。因此,在当前实现中无法对每个元素应用TTL。
在值状态下,每个用户值应用TTL;在列表状态下,每个用户元素应用TTL;在映射状态下,每个用户键/值对应用TTL。
根据您的应用程序的要求,您可以尝试使用组合键列出状态:
key of KeyedProcessFunction = current key of your KeyedProcessFunction + your current map state key
这无法通过KeyedProcessFunction的当前键轻松获取所有列表,尽管您现在可以这样做。