Kafka:ClassCastException:类 org.apache.avro.generic.GenericData$Record 无法转换为类

问题描述 投票:0回答:3

当尝试将 record.value() 转换为 java 对象时,我在消费者中遇到了这个异常:

ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast  to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')

生产者发送java对象,这是一个名为PublicActivityRecord

的用户定义类型,如下所示:

KafkaProducer<String, PublicActivityRecord> producer = new KafkaProducer<>(createKafkaProperties()); [...] this.producer.send(new ProducerRecord<String, PublicActivityRecord>(myTopic, activityRecord)); this.producer.flush();
此时我可以在调试模式下看到

ProducerRecord

的值确实是
PublicActivityRecord
类型。

在注册服务器上,我可以在日志中看到生产者发送模式的 POST 请求:

Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262) [2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 +0000] "POST /subjects/DEV-INF_9325_activityRecord_01-value/versions HTTP/1.1" 200 8 "-" "Java/11.0.2" POSTsT (io.confluent.rest-utils.requests:62)

消费者方面:

protected KafkaConsumer<String, PublicActivityRecord> consumer; [...] consumer = new KafkaConsumer<>(consumerProperties); consumer.subscribe(Stream.of(kafkaConfig.getTopicActivityRecord()).collect(Collectors.toList())); final ConsumerRecords<String, PublicActivityRecord> records = consumer.poll(duration); records.forEach(record -> { [...] PublicActivityRecord activityRecord = record.value();
这里发生了ClassCastException。

在调试模式下,我可以看到

record.value

确实是
GenericData$Record
类型。并且它不能转换为 PublicActivityRecord。

序列化器/反序列化器的键和值是相同的:

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
在 schema-registry 日志中,我可以看到消费者的 GET 请求:

"GET /schemas/ids/3?fetchMaxId=false HTTP/1.1" 200 8447 "-" "Java/11.0.7" GETsT (io.confluent.rest-utils.requests:62)
所以我已经检查过:

    生产者发送一条带有我自己类型的消息
  1. PublicActivityRecord
    
    
  2. kafka Broker 收到消息
  3. 生产者将模式发布到模式注册表
  4. 消息被消费者获取
  5. 模式由消费者从模式注册表中获取
  6. 消息的价值是意想不到的
  7. GenericData$Record
    
    
这导致我得出的结果是我的消费者出了问题。

所以问题是:为什么消费者得到的是

GenericData

 记录而不是预期的 
PublicActivityRecord

任何线索将不胜感激!

java apache-kafka avro classcastexception confluent-schema-registry
3个回答
25
投票
默认情况下,仅返回通用记录。您需要设置

value.deserializer.specific.avro.reader=true
或者,在您的消费者配置中使用常量

KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG

=真


0
投票
我们在 Apache Flink 中使用 AVRO 反序列化 Kafka 记录时遇到了同样的错误。原因是我们重命名了 AVRO 工具生成的 POJO 对象的名称空间。因此模式和 POJO 对象的命名空间不同。


0
投票
我添加了此配置,它对我有用。

.consumerProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,true)

示例:

@Bean public ReceiverOptions<String, OrderRequestModel> receiverOptions() { Map<String, Object> propetiesMap = new HashMap<>(); propetiesMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); propetiesMap.put("schema.registry.url", "http://localhost:8081"); propetiesMap.put(ConsumerConfig.GROUP_ID_CONFIG, group); propetiesMap.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "1"); propetiesMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); propetiesMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propetiesMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); return ReceiverOptions.<String, OrderRequestModel>create(propetiesMap) .consumerProperty(JsonDeserializer.VALUE_DEFAULT_TYPE, OrderRequestModel.class) .consumerProperty(JsonDeserializer.USE_TYPE_INFO_HEADERS, false) .consumerProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true) .subscription(List.of(topic)); }
    
© www.soinside.com 2019 - 2024. All rights reserved.