我正在学习Kafka如何处理异常。我有一个需要 JSON 的消息使用者 传入数据以便
JsonDeserializer
能够正常完成其工作。
如果我向 Kafka 主题发送无效的字符串内容,则会出现以下反序列化错误:
Error deserializing key/value for partition aaa.bbb.response-0 at offset 2.
If needed, please seek past the record to continue consumption.
那太好了。我抓住它并以这种方式记录它:
@Bean
public ConsumerFactory<String, EventEnvelop> consumerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
log.debug("kafka consumer bootstrap addresses: {}", bootstrapAddresses);
configs.forEach((key, value) -> log.debug("kafka consumer configuration: {\"{}\": \"{}\"", key, value));
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EventEnvelop> kafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EventEnvelop> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerConfigs());
factory.setConcurrency(1);
factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(3000);
factory.setCommonErrorHandler(new KafkaConsumerErrorHandler());
return factory;
}
我的消息消费者如下所示:
@KafkaListener(
id = "response-topic-listener",
topics = "${app.kafka.topic.response}",
groupId = "response-group-1",
containerFactory = "kafkaListenerFactory")
public void listen(EventEnvelop message) {
log.info("new incoming message: {}", message);
}
这就是我捕获序列化错误的方法:
public class KafkaConsumerErrorHandler implements CommonErrorHandler {
@Override
public boolean handleOne(Exception exception,
ConsumerRecord<?, ?> record,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
return handle(exception, consumer);
}
@Override
public void handleOtherException(Exception exception,
Consumer<?, ?> consumer,
MessageListenerContainer container,
boolean batchListener) {
handle(exception, consumer);
}
private boolean handle(Exception exception, Consumer<?,?> consumer) {
if (exception instanceof RecordDeserializationException e) {
log.debug("Incoming message: {}", getIncomingMessage(...); --> ??????
log.error("Unable to parse the incoming record. {}", e.getMessage());
consumer.seek(e.topicPartition(), e.offset() + 1L);
consumer.commitSync();
} else {
log.error("An unexpected error occurred while trying to handle the incoming message: ", exception);
}
return false;
}
}
我想记录解析器无法解析的原始传入消息。我尝试过寻找这个,也尝试过在调试模式下探索
consumer
和 container
对象,但没有运气。
+1
我的代码总是调用
handleOtherException
而 handleOne
方法从未被调用。
+2
消息顺序对我来说很重要,因此我在
setConcurrency(1)
配置上使用 setBatchListener(false)
和 ConcurrentKafkaListenerContainerFactory
。
是否有可能以某种方式获取原始传入消息?如何在我的代码中实现
getIncomingMessage(...)
方法?
当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,引入了
ErrorHandlingDeserializer
。该解串器委托给真正的解串器(键或值)。如果委托无法反序列化记录内容,则 ErrorHandlingDeserializer
返回空值,并在包含原因和原始字节的标头中返回 DeserializationException
。
在您的情况下,处理上述异常将为您提供失败记录的原始字节。
请在此处查看详细信息和示例:https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#error-handling-deserializer