我使用节俭只是为了进行序列化和反序列化以提高性能,同时从卡夫卡流式传输字节数据
反序列化时,我不断收到此错误:
org.apache.thrift.protocol.TProtocolException:无法识别的类型123
我的代码
public void streamMessageByte() {
final StreamsBuilder builder = new StreamsBuilder();
<Integer, byte[]> stream = builder.stream(kafka_topic);
deserializer = new TDeserializer();
serializer = new TSerializer();
//Thrift class pojo object is 'deser' which matches byte array data format
stream.map((k,v){
try{
deserializer.deserialize(deser, v);
}
catch(TException e){
}
null;
});
当我使用不同的协议进行序列化和反序列化时,我遇到了这个问题。
串行器使用ObjectMapper
,解串器使用TDeserializer
和TBinaryProtocol
。示例:
@Test
public void testSerDe() throws TException, JsonProcessingException {
final Person person = new Person("Thomas", Byte.valueOf("23"));
JsonSerializer serializer = new JsonSerializer();
ObjectMapper mapper = new ObjectMapper();
byte[] serialized = mapper.writeValueAsString(person).getBytes();
TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
Person desPerson = new Person();
deserializer.deserialize(desPerson, serialized);
assertEquals(person, desPerson);
}
会抛出org.apache.thrift.protocol.TProtocolException: Unrecognized type 123
如果您以相同的方式进行序列化和反序列化,它应该可以工作。这是一个例子:
@Test
public void testSerDe() throws TException {
final Person person = new Person("Thomas", Byte.valueOf("23"));
TSerializer serializer = new TSerializer(TBinaryProtocol::new);
TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
byte[] serializedPerson = serializer.serialize(person);
GraphEvent dePerson = new Person();
deserializer.deserialize(dePerson, serializedPerson);
assertEquals(person, dePerson);
}