Avro 特定记录类型与通用记录类型 - 哪个最好或者我可以在之间进行转换吗?

问题描述 投票:0回答:2

我们正在尝试在提供通用记录格式还是特定记录格式以供客户使用之间做出决定 着眼于提供在线模式注册表,以便在模式更新时客户端可以访问。 我们希望发送序列化的 blob,前缀为几个字节,表示版本号,因此模式 从我们的注册表中检索可以自动化。

现在,我们遇到了代码示例,说明了通用格式的相对适应性 模式发生变化,但我们不愿意放弃特定模式提供的类型安全性和易用性 格式。

有没有办法两全其美? IE。我们可以使用和操纵特定生成的 内部类,然后让它们在序列化之前自动将它们转换为通用记录?
然后,客户端将反序列化通用记录(在查找架构之后)。

此外,客户可以稍后将他们收到的这些通用记录转换为特定记录吗? 一些小代码示例会很有帮助!

或者我们看待这一切的方式都是错误的?

serialization avro
2个回答
4
投票

您正在寻找的是 Confluence Schema 注册表服务和库,有助于与其集成。

提供一个示例来使用不断发展的模式编写序列化和反序列化 avro 数据。请注意提供 Kafka 的样本。

import io.confluent.kafka.serializers.KafkaAvroDeserializer;  
import io.confluent.kafka.serializers.KafkaAvroSerializer; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex;

import java.util.HashMap; import java.util.Map;

public class ConfluentSchemaService {

    public static final String TOPIC = "DUMMYTOPIC";

    private KafkaAvroSerializer avroSerializer;
    private KafkaAvroDeserializer avroDeserializer;

    public ConfluentSchemaService(String conFluentSchemaRigistryURL) {

        //PropertiesMap
        Map<String, String> propMap = new HashMap<>();
        propMap.put("schema.registry.url", conFluentSchemaRigistryURL);
        // Output afterDeserialize should be a specific Record and not Generic Record
        propMap.put("specific.avro.reader", "true");

        avroSerializer = new KafkaAvroSerializer();
        avroSerializer.configure(propMap, true);

        avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(propMap, true);
    }

    public String hexBytesToString(byte[] inputBytes) {
        return Hex.encodeHexString(inputBytes);
    }

    public byte[] hexStringToBytes(String hexEncodedString) throws DecoderException {
        return Hex.decodeHex(hexEncodedString.toCharArray());
    }

    public byte[] serializeAvroPOJOToBytes(GenericRecord avroRecord) {
        return avroSerializer.serialize(TOPIC, avroRecord);
    }

    public Object deserializeBytesToAvroPOJO(byte[] avroBytearray) {
        return avroDeserializer.deserialize(TOPIC, avroBytearray);
    } }

以下课程包含您正在寻找的所有代码。 io.confluence.kafka.serializers.KafkaAvroDeserializer;
io.confluence.kafka.serializers.KafkaAvroSerializer;


2
投票

我可以在它们之间进行转换吗?

我编写了以下 kotlin 代码,通过 JSON 将

SpecificRecord
转换为
GenericRecord
并返回。

PositionReport
是使用 gradle 的 avro 插件从 avro 生成的对象 - 它是:


@org.apache.avro.specific.AvroGenerated
public class PositionReport extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
...

使用的功能如下


 /**
     * Encodes a record in AVRO Compatible JSON, meaning union types
     * are wrapped. For prettier JSON just use the Object Mapper
     * @param pos PositionReport
     * @return String
     */
    private fun PositionReport.toAvroJson() : String {
        val writer = SpecificDatumWriter(PositionReport::class.java)
        val baos = ByteArrayOutputStream()

        val jsonEncoder = EncoderFactory.get().jsonEncoder(this.schema, baos)
        writer.write(this, jsonEncoder)
        jsonEncoder.flush()
        return baos.toString("UTF-8")
    }

    /**
     * Converts from Genreic Record into JSON - Seems smarter, however,
     * to unify this function and the one above but whatevs
     * @param record GenericRecord
     * @param schema Schema
     */
    private fun GenericRecord.toAvroJson(): String {
        val writer = GenericDatumWriter<Any>(this.schema)
        val baos = ByteArrayOutputStream()

        val jsonEncoder = EncoderFactory.get().jsonEncoder(this.schema, baos)
        writer.write(this, jsonEncoder)
        jsonEncoder.flush()
        return baos.toString("UTF-8")
    }

    /**
     * Takes a Generic Record of a position report and hopefully turns
     * it into a position report... maybe it will work
     * @param gen GenericRecord
     * @return PositionReport
     */
    private fun toPosition(gen: GenericRecord) : PositionReport {

        if (gen.schema != PositionReport.getClassSchema()) {
            throw Exception("Cannot convert GenericRecord to PositionReport as the Schemas do not match")
        }

        // We will convert into JSON - and use that to then convert back to the SpecificRecord
        // Probalby there is a better way
        val json = gen.toAvroJson()

        val reader: DatumReader<PositionReport> = SpecificDatumReader(PositionReport::class.java)
        val decoder: Decoder = DecoderFactory.get().jsonDecoder(PositionReport.getClassSchema(), json)
        val pos = reader.read(null, decoder)
        return pos
    }

    /**
     * Converts a Specific Record to a Generic Record (I think)
     * @param pos PositionReport
     * @return GenericData.Record
     */
    private fun toGenericRecord(pos: PositionReport): GenericData.Record {
        val json = pos.toAvroJson()

        val reader : DatumReader<GenericData.Record> = GenericDatumReader(pos.schema)
        val decoder: Decoder = DecoderFactory.get().jsonDecoder(pos.schema, json)
        val datum = reader.read(null, decoder)
        return datum
    }

但是两者之间有一些区别:

  • SpecificRecord
    中属于
    Instant
    类型的字段将在
    GenericRecord
    中编码,因为
    long
    和枚举略有不同

例如,在我的此函数的单元测试中,时间字段的测试如下:

val gen = toGenericRecord(basePosition)
assertEquals(basePosition.getIgtd().toEpochMilli(), gen.get("igtd"))

枚举通过字符串进行验证

val gen = toGenericRecord(basePosition)
assertEquals(basePosition.getSource().toString(), gen.get("source").toString())

因此,要在之间进行转换,您可以这样做:


val gen = toGenericRecord(basePosition)
val newPos = toPosition(gen)

assertEquals(newPos, basePosition)

© www.soinside.com 2019 - 2024. All rights reserved.