Kafka流中的节俭序列化和反序列化

问题描述 投票:0回答:1

我使用节俭只是为了进行序列化和反序列化以提高性能,同时从卡夫卡流式传输字节数据

反序列化时,我不断收到此错误:

org.apache.thrift.protocol.TProtocolException:无法识别的类型123

我的代码

public void streamMessageByte() {   
    final StreamsBuilder builder = new StreamsBuilder();
    <Integer, byte[]> stream = builder.stream(kafka_topic);
    deserializer = new TDeserializer();
    serializer = new TSerializer();
    //Thrift class pojo object is 'deser' which matches byte array data format
    stream.map((k,v){

        try{
            deserializer.deserialize(deser, v);
        }

        catch(TException e){

        }
    null;
});
apache-kafka thrift
1个回答
0
投票

当我使用不同的协议进行序列化和反序列化时,我遇到了这个问题。

串行器使用ObjectMapper,解串器使用TDeserializerTBinaryProtocol。示例:

@Test
public void testSerDe() throws TException, JsonProcessingException {
   final Person person = new Person("Thomas", Byte.valueOf("23"));
   JsonSerializer serializer = new JsonSerializer();
   ObjectMapper mapper = new ObjectMapper();
   byte[] serialized = mapper.writeValueAsString(person).getBytes();
   TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
   Person desPerson = new Person();
   deserializer.deserialize(desPerson, serialized);
   assertEquals(person, desPerson);
}

会抛出org.apache.thrift.protocol.TProtocolException: Unrecognized type 123

如果您以相同的方式进行序列化和反序列化,它应该可以工作。这是一个例子:

@Test
public void testSerDe() throws TException {
   final Person person = new Person("Thomas", Byte.valueOf("23"));
   TSerializer serializer = new TSerializer(TBinaryProtocol::new);
   TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
   byte[] serializedPerson = serializer.serialize(person);
   GraphEvent dePerson = new Person();
   deserializer.deserialize(dePerson, serializedPerson);
   assertEquals(person, dePerson);
}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.