我制作了自定义KeyedDeserializationSchema
来反序列化kafka消息并使用它像这样:
object Job {
case class KafkaMsg[K, V](
key: K, value: V, topic: String, partiton: Int, offset: Long)
trait Deser[A] {
def deser(a: Array[Byte]): A
}
object Deser {
def apply[A](implicit sh: Deser[A]): Deser[A] = sh
def deser[A: Deser](a: Array[Byte]) = Deser[A].deser(a)
implicit val stringDeser: Deser[String] =
new Deser[String] {
def deser(a: Array[Byte]): String = ""
}
implicit val longDeser: Deser[Long] =
new Deser[Long] {
def deser(a: Array[Byte]): Long = 0
}
}
class TypedKeyedDeserializationSchema[
K: Deser: TypeInformation,
V: Deser: TypeInformation
] extends KeyedDeserializationSchema[KafkaMsg[K, V]] {
def deserialize(key: Array[Byte],
value: Array[Byte],
topic: String,
partition: Int,
offset: Long
): KafkaMsg[K, V] =
KafkaMsg(Deser[K].deser(key),
Deser[V].deser(value),
topic,
partition,
offset
)
def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false
def getProducedType(): TypeInformation[KafkaMsg[K, V]] =
createTypeInformation
}
def main(args: Array[String]) {
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink-test")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new FlinkKafkaConsumer011(
"topic",
new TypedKeyedDeserializationSchema[String, Long],
properties
))
.print
env.execute("Flink Scala API Skeleton")
}
}
这给了我:
[error] Caused by: java.io.NotSerializableException: l7.Job$Deser$$anon$7
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[error] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[error] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[error] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[error] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[error] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[error] at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
[error] at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
[error] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
[error] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
[error] at l7.Job$.main(Job.scala:89)
[error] at l7.Job.main(Job.scala)
问题显然在我的Deser
类型类实现,但我不明白究竟是什么导致此错误或如何解决它。
是的,这个错误的原因是你的Deser
不像TypeInformation
不扩展/实施Serializable
。要找出发生这种情况的原因,你可以先问自己一个问题:为什么我需要申报implicit val stringDeser
和implicit val longDeser
?
答案是Scala编译器在以K: Deser: TypeInformation
的形式看到通用约束时所做的事情。它的作用是使用implicit
证据对象重写它。所以你的代码转换成这样的东西:
class TypedKeyedDeserializationSchema[K, V](implicit val kDeserEv: Deser[K],
val kTypeInfoEn: TypeInformation[K],
val vDeserEv: Deser[V],
val vTypeInfoEn: TypeInformation[V]) extends KeyedDeserializationSchema[KafkaMsg[K, V]] {
def deserialize(key: Array[Byte],
value: Array[Byte],
topic: String,
partition: Int,
offset: Long
): KafkaMsg[K, V] =
KafkaMsg(kDeserEv.deser(key),
vDeserEv.deser(value),
topic,
partition,
offset
)
def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false
def getProducedType(): TypeInformation[KafkaMsg[K, V]] = createTypeInformation
}
现在很明显,TypedKeyedDeserializationSchema[String,Long]
类型的对象实际上包含两个类型为Deser[String]
和Deser[Long]
的字段,其值来自您在上面声明的implicit val
s。因此,当Flink尝试确保您传递给它的函数是Serializable
时,检查失败。
现在解决方案是显而易见的:让你的特质Deser[A]
扩展Serializable
trait Deser[A] extends Serializable {
def deser(a: Array[Byte]): A
}