我有一个数据流[GenericRecord]:
val consumer = new FlinkKafkaConsumer[String]("input_csv_topic", new SimpleStringSchema(), properties)
val stream = senv.
addSource(consumer).
map(line => {
val arr = line.split(",")
val schemaUrl = "" // avro schema link, standard .avsc file format
val schemaStr = scala.io.Source.fromURL(schemaUrl).mkString.toString().stripLineEnd
import org.codehaus.jettison.json.{JSONObject, JSONArray}
val schemaFields: JSONArray = new JSONObject(schemaStr).optJSONArray("fields")
val genericDevice: GenericRecord = new GenericData.Record(new Schema.Parser().parse(schemaStr))
for(i <- 0 until arr.length) {
val fieldObj: JSONObject = schemaFields.optJSONObject(i)
val columnName = fieldObj.optString("name")
var columnType = fieldObj.optString("type")
if (columnType.contains("string")) {
genericDevice.put(columnName, arr(i))
} else if (columnType.contains("int")) {
genericDevice.put(columnName, toInt(arr(i)).getOrElse(0).asInstanceOf[Number].intValue)
} else if (columnType.contains("long")) {
genericDevice.put(columnName, toLong(arr(i)).getOrElse(0).asInstanceOf[Number].longValue)
}
}
genericDevice
})
val kafkaSink = new FlinkKafkaProducer[GenericRecord](
"output_avro_topic",
new MyKafkaAvroSerializationSchema[GenericRecord](classOf[GenericRecord], "output_avro_topic", "this is the key", schemaStr),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
stream.addSink(kafkaSink)
这是 MyKafkaAvroSerializationSchema 实现:
class MyKafkaAvroSerializationSchema[T](avroType: Class[T], topic: String, key: String, schemaStr: String) extends KafkaSerializationSchema[T] {
lazy val schema: Schema = new Schema.Parser().parse(schemaStr)
override def serialize(element: T, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val cl = Thread.currentThread().getContextClassLoader()
val genericData = new GenericData(cl)
val writer = new GenericDatumWriter[T](schema, genericData)
// val writer = new ReflectDatumWriter[T](schema)
// val writer = new SpecificDatumWriter[T](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(element, encoder)
encoder.flush()
out.close()
new ProducerRecord[Array[Byte], Array[Byte]](topic, key.getBytes, out.toByteArray)
}
}
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
如何使用Flink将Avro Generic Record序列化到Kafka?我测试了不同的编写器,但仍然得到 com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException,感谢您的输入。
您可以简单地将
flink-avro
模块添加到您的项目中,并使用已提供的 AvroSerializationSchema
,在提供架构后,该模块可用于 SpecificRecord
和 GenericRecord
。
您最终找到解决方案了吗?
我在将 GenericRecord 写入 Flink 版本 1.19 的 PubSubSink 时遇到了类似的问题。我尝试过以下 SerializationSchema 定义:
AvroSerializationSchema<GenericRecord> serializationSchema = AvroSerializationSchema.forGeneric(schema);
和
TypeInformationSerializationSchema<GenericRecord> serializationSchema =
new TypeInformationSerializationSchema<>(
new GenericRecordAvroTypeInfo(schema), new AvroSerializer<>(GenericRecord.class, schema));
但仍然遇到与您相同的 KryoException。
接收器代码如下:
try {
PubSubSink<GenericRecord> pubSubSink = PubSubSink.newBuilder()
.withSerializationSchema(serializationSchema)
.withProjectName("project-id")
.withTopicName("topic")
// For emulation
.withHostAndPortForEmulator(properties.getEmulatorHostAndPort())
.withCredentials(EmulatorCredentials.getInstance())
.build();
stream.addSink(pubSubSink).name(this.getClass().getName());
} catch (IOException e) {
throw new UncheckedIOException("Exception while creating PubSubSink", e);
}