Kafka反序列化键/值异常:记录原始传入消息

问题描述 投票:0回答:1

我正在学习Kafka如何处理异常。我有一个需要 JSON 的消息使用者 传入数据以便

JsonDeserializer
能够正常完成其工作。

如果我向 Kafka 主题发送无效的字符串内容,则会出现以下反序列化错误:

Error deserializing key/value for partition aaa.bbb.response-0 at offset 2.
If needed, please seek past the record to continue consumption.

那太好了。我抓住它并以这种方式记录它:

@Bean
public ConsumerFactory<String, EventEnvelop> consumerConfigs() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    log.debug("kafka consumer bootstrap addresses: {}", bootstrapAddresses);
    configs.forEach((key, value) -> log.debug("kafka consumer configuration: {\"{}\": \"{}\"", key, value));

    return new DefaultKafkaConsumerFactory<>(configs);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, EventEnvelop> kafkaListenerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, EventEnvelop> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerConfigs());
    factory.setConcurrency(1);
    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(3000);
    factory.setCommonErrorHandler(new KafkaConsumerErrorHandler());
    return factory;
}

我的消息消费者如下所示:

@KafkaListener(
        id = "response-topic-listener",
        topics = "${app.kafka.topic.response}",
        groupId = "response-group-1",
        containerFactory = "kafkaListenerFactory")
public void listen(EventEnvelop message) {
    log.info("new incoming message: {}", message);
}

这就是我捕获序列化错误的方法:

public class KafkaConsumerErrorHandler implements CommonErrorHandler {

    @Override
    public boolean handleOne(Exception exception,
                             ConsumerRecord<?, ?> record,
                             Consumer<?, ?> consumer,
                             MessageListenerContainer container) {
        return handle(exception, consumer);
    }

    @Override
    public void handleOtherException(Exception exception,
                                     Consumer<?, ?> consumer,
                                     MessageListenerContainer container,
                                     boolean batchListener) {
        handle(exception, consumer);
    }

    private boolean handle(Exception exception, Consumer<?,?> consumer) {
        if (exception instanceof RecordDeserializationException e) {
            log.debug("Incoming message: {}", getIncomingMessage(...); --> ??????
            log.error("Unable to parse the incoming record. {}", e.getMessage());
            consumer.seek(e.topicPartition(), e.offset() + 1L);
            consumer.commitSync();
        } else {
            log.error("An unexpected error occurred while trying to handle the incoming message: ", exception);
        }
        return false;
    }
}

我想记录解析器无法解析的原始传入消息。我尝试过寻找这个,也尝试过在调试模式下探索

consumer
container
对象,但没有运气。

+1

我的代码总是调用

handleOtherException
handleOne
方法从未被调用。

+2

消息顺序对我来说很重要,因此我在

setConcurrency(1)
配置上使用
setBatchListener(false)
ConcurrentKafkaListenerContainerFactory

是否有可能以某种方式获取原始传入消息?如何在我的代码中实现

getIncomingMessage(...)
方法?

java spring spring-boot apache-kafka
1个回答
0
投票

当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,引入了

ErrorHandlingDeserializer
。该解串器委托给真正的解串器(键或值)。如果委托无法反序列化记录内容,则
ErrorHandlingDeserializer
返回空值,并在包含原因和原始字节的标头中返回
DeserializationException

在您的情况下,处理上述异常将为您提供失败记录的原始字节。

请在此处查看详细信息和示例:https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#error-handling-deserializer

© www.soinside.com 2019 - 2024. All rights reserved.