给Flink类型信息来序列化累加器对象,无需Kryo

问题描述 投票:0回答:1

我有一个自定义的AggregateFunction,其签名如下:

public class CustomAggregateFunction
    implements AggregateFunction<CustomInput, AggregationAccumulator, CustomOutput> { code...}

我的

AggregationAccumulator
是一个简单的对象,包含一些地图,带有Lombok @Data注释

  @Data
  public static class AggregationAccumulator {
    private Map<String, Long> customMap = new HashMap<>();
  }

然而,Flink 是这么说的

13:18:50,091 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Field AggregationAccumulator#customMap will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

我怎样才能提供类型信息,这样它就不会使用 Kryo?

java apache-flink flink-streaming
1个回答
0
投票

您可以向地图添加

@TypeInfo
注释,并提供适当的
TypeInfoFactory
实现。像这样的东西:

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;

public class AccumulatorWithMap {

    @TypeInfo(MapTypeInfoFactory.class)
    private Map<String, Long> customMap = new HashMap<>();
    
    public AccumulatorWithMap() { }

    public Map<String, Long> getCustomMap() {
        return customMap;
    }

    public void setCustomMap(Map<String, Long> customMap) {
        this.customMap = customMap;
    }
    
    public static class MapTypeInfoFactory<K, V> extends TypeInfoFactory<Map<K, V>> {

        @SuppressWarnings("unchecked")
        @Override
        public TypeInformation<Map<K, V>> createTypeInfo(
                Type t, Map<String, TypeInformation<?>> genericParameters) {
            TypeInformation<K> keyType = (TypeInformation<K>)genericParameters.get("K");
            TypeInformation<V> valueType = (TypeInformation<V>)genericParameters.get("V");

            if (keyType == null) {
                throw new InvalidTypesException(
                        "Type extraction is not possible on Map"
                                + " type as it does not contain information about the 'key' type.");
            }

            if (valueType == null) {
                throw new InvalidTypesException(
                        "Type extraction is not possible on Map"
                                + " type as it does not contain information about the 'value' type.");
            }
            
            return new MapTypeInfo<K, V>(keyType, valueType);
        }
    }

    
}
© www.soinside.com 2019 - 2024. All rights reserved.