我将 Apache Flink (v1.11) 与 Scala 结合使用,并为 Kafka 连接器添加了自己的 DeserializationSchema。因此我想使用我自己的软件包和 jackson 版本(v2.12.0)。
但是我收到以下错误:
Exception in thread "main" java.lang.VerifyError: Cannot inherit from final class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at com.fasterxml.jackson.dataformat.csv.CsvMapper.<init>(CsvMapper.java:108)
at de.integration_factory.datastream.types.CovidEventSchema.<init>(CovidEventSchema.scala:14)
at de.integration_factory.datastream.Aggregate_Datastream$.main(Aggregate_Datastream.scala:34)
at de.integration_factory.datastream.Aggregate_Datastream.main(Aggregate_Datastream.scala)
这是我的事件架构:
import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.datatype.joda.JodaModule
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
@SerialVersionUID(6154188370181669758L)
class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {
private val mapper = new CsvMapper
mapper.registerModule(new JodaModule)
val csvSchema = mapper
.schemaFor(classOf[CovidEvent])
.withLineSeparator(",")
.withoutHeader()
val reader = mapper.readerWithSchemaFor(classOf[CovidEvent])
def serialize(event: CovidEvent): Array[Byte] = mapper.writer(csvSchema).writeValueAsBytes()
@throws[IOException]
def deserialize(message: Array[Byte]): CovidEvent = reader.readValue[CovidEvent](message)
def isEndOfStream(nextElement: CovidEvent) = false
def getProducedType: TypeInformation[CovidEvent] = TypeInformation.of(classOf[CovidEvent])
}
This is my PoJo for schema:
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.time.DateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CovidEvent {
private long objectId;
private int bundeslandId;
private String bundesland;
private String landkreis;
private String altersgruppe;
private String geschlecht;
private int anzahlFall;
private int anzahlTodesfall;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
private DateTime meldedatum;
private int landkreisId;
private String datenstand;
private int neuerFall;
private int neuerTodesfall;
private String refDatum;
private int neuGenesen;
private int anzahlGenesen;
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
private boolean istErkrankungsbeginn;
private String altersGruppe2;
public long getEventtime() {
return meldedatum.getMillis();
}
}
经过一些研究我发现该错误可能是由类路径中不同的Jackson版本引起的。
我认为可以使用自己的 Jackson 版本,因为 Flink 屏蔽了自己的版本。
我做错了什么?
更新:如果我从着色的 flink 包导入 Jackson 类,它就可以工作了
但是所以我依赖于 flink Shaded Jackson 版本。
更新:那么使用
open
更好的实现是这样的?
class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {
private var reader: ObjectReader = null
private var writer: ObjectWriter = null
override def open(context: SerializationSchema.InitializationContext): Unit = {
val mapper = new CsvMapper()
val csvSchema = mapper
.schemaFor(classOf[CovidEvent])
.withLineSeparator(",")
.withoutHeader()
this.reader = mapper.readerFor(classOf[CovidEvent]).`with`(csvSchema)
this.writer = mapper.writer(csvSchema)
super.open(context)
}
}
如果使用Flink的classloader就可以了。但是,按照您的设置工作方式,您只需在创建整个 DataStream 应用程序时将用户代码加载到系统类加载器中即可。我不会详细介绍更多细节(除非后续要求)并寻求解决方案:
您的 DeserializationSchema 不应在创建期间初始化大量资源(这发生在客户端或作业管理器端),而只能在
open
中初始化(这发生在任务管理器上)。所以请移动
private val mapper = new CsvMapper
mapper.registerModule(new JodaModule)
进入
open
。
它仅适用于捆绑版本,因为 - 幸运的是 -
ObjectMapper implements Serializable
但解析器很少出现这种情况,如果解串器初始化正确,实际上完全没有必要。