我有一个自定义的AggregateFunction,其签名如下:
public class CustomAggregateFunction
implements AggregateFunction<CustomInput, AggregationAccumulator, CustomOutput> { code...}
我的
AggregationAccumulator
是一个简单的对象,包含一些地图,带有Lombok @Data注释
@Data
public static class AggregationAccumulator {
private Map<String, Long> customMap = new HashMap<>();
}
然而,Flink 是这么说的
13:18:50,091 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field AggregationAccumulator#customMap will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
我怎样才能提供类型信息,这样它就不会使用 Kryo?
您可以向地图添加
@TypeInfo
注释,并提供适当的 TypeInfoFactory
实现。像这样的东西:
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
public class AccumulatorWithMap {
@TypeInfo(MapTypeInfoFactory.class)
private Map<String, Long> customMap = new HashMap<>();
public AccumulatorWithMap() { }
public Map<String, Long> getCustomMap() {
return customMap;
}
public void setCustomMap(Map<String, Long> customMap) {
this.customMap = customMap;
}
public static class MapTypeInfoFactory<K, V> extends TypeInfoFactory<Map<K, V>> {
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Map<K, V>> createTypeInfo(
Type t, Map<String, TypeInformation<?>> genericParameters) {
TypeInformation<K> keyType = (TypeInformation<K>)genericParameters.get("K");
TypeInformation<V> valueType = (TypeInformation<V>)genericParameters.get("V");
if (keyType == null) {
throw new InvalidTypesException(
"Type extraction is not possible on Map"
+ " type as it does not contain information about the 'key' type.");
}
if (valueType == null) {
throw new InvalidTypesException(
"Type extraction is not possible on Map"
+ " type as it does not contain information about the 'value' type.");
}
return new MapTypeInfo<K, V>(keyType, valueType);
}
}
}