在 Flink 中使用自己的 jackson 版本会导致VerifyError

问题描述 投票:0回答:1

我将 Apache Flink (v1.11) 与 Scala 结合使用,并为 Kafka 连接器添加了自己的 DeserializationSchema。因此我想使用我自己的软件包和 jackson 版本(v2.12.0)。

但是我收到以下错误:

    Exception in thread "main" java.lang.VerifyError: Cannot inherit from final class
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at com.fasterxml.jackson.dataformat.csv.CsvMapper.<init>(CsvMapper.java:108)
        at de.integration_factory.datastream.types.CovidEventSchema.<init>(CovidEventSchema.scala:14)
        at de.integration_factory.datastream.Aggregate_Datastream$.main(Aggregate_Datastream.scala:34)
        at de.integration_factory.datastream.Aggregate_Datastream.main(Aggregate_Datastream.scala)

这是我的事件架构:

    import com.fasterxml.jackson.dataformat.csv.CsvMapper
    import com.fasterxml.jackson.datatype.joda.JodaModule
    import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
    import org.apache.flink.api.common.typeinfo.TypeInformation

    @SerialVersionUID(6154188370181669758L)
    class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {
    
      private val mapper = new CsvMapper
      mapper.registerModule(new JodaModule)
    
      val csvSchema = mapper
        .schemaFor(classOf[CovidEvent])
        .withLineSeparator(",")
        .withoutHeader()
      val  reader = mapper.readerWithSchemaFor(classOf[CovidEvent])
    
      def serialize(event: CovidEvent): Array[Byte] = mapper.writer(csvSchema).writeValueAsBytes()
    
      @throws[IOException]
      def deserialize(message: Array[Byte]): CovidEvent = reader.readValue[CovidEvent](message)
    
    
      def isEndOfStream(nextElement: CovidEvent) = false
    
      def getProducedType: TypeInformation[CovidEvent] = TypeInformation.of(classOf[CovidEvent])
    }

This is my PoJo for schema:

    import com.fasterxml.jackson.annotation.JsonFormat;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.joda.time.DateTime;
    
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class CovidEvent {
    
        private long objectId;
        private int bundeslandId;
        private String bundesland;
        private String landkreis;
        private String altersgruppe;
        private String geschlecht;
        private int anzahlFall;
        private int anzahlTodesfall;
        @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
        private DateTime meldedatum;
        private int landkreisId;
        private String datenstand;
        private int neuerFall;
        private int neuerTodesfall;
        private String refDatum;
        private int neuGenesen;
        private int anzahlGenesen;
        @JsonFormat(shape = JsonFormat.Shape.NUMBER)
        private boolean istErkrankungsbeginn;
        private String altersGruppe2;
    
        public long getEventtime() {
            return meldedatum.getMillis();
        }
    
    }

经过一些研究我发现该错误可能是由类路径中不同的Jackson版本引起的。

我认为可以使用自己的 Jackson 版本,因为 Flink 屏蔽了自己的版本。

我做错了什么?

更新:如果我从着色的 flink 包导入 Jackson 类,它就可以工作了

但是所以我依赖于 flink Shaded Jackson 版本。

更新:那么使用

open
更好的实现是这样的?

    class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {
    
       private  var  reader: ObjectReader = null
    
      private var writer: ObjectWriter = null
    
      override def open(context: SerializationSchema.InitializationContext): Unit = {
    
         val mapper  = new CsvMapper()
    
         val csvSchema = mapper
          .schemaFor(classOf[CovidEvent])
          .withLineSeparator(",")
          .withoutHeader()
    
        this.reader = mapper.readerFor(classOf[CovidEvent]).`with`(csvSchema)
        this.writer = mapper.writer(csvSchema)
        super.open(context)
      }
    }
jackson apache-flink flink-streaming jackson-databind jackson2
1个回答
0
投票

如果使用Flink的classloader就可以了。但是,按照您的设置工作方式,您只需在创建整个 DataStream 应用程序时将用户代码加载到系统类加载器中即可。我不会详细介绍更多细节(除非后续要求)并寻求解决方案:

您的 DeserializationSchema 不应在创建期间初始化大量资源(这发生在客户端或作业管理器端),而只能在

open
中初始化(这发生在任务管理器上)。所以请移动

private val mapper = new CsvMapper
  mapper.registerModule(new JodaModule)

进入

open

它仅适用于捆绑版本,因为 - 幸运的是 -

ObjectMapper implements Serializable
但解析器很少出现这种情况,如果解串器初始化正确,实际上完全没有必要。

© www.soinside.com 2019 - 2024. All rights reserved.