我有一个 Spring Boot 应用程序,它有一个带有 @KafkaListener 的简单 Consumer。我有阻塞重试逻辑,可以按预期工作,但如果出现反序列化异常,我想将损坏的消息存储在 DLT 主题中,以便稍后手动分析和处理。
我正在使用 DefaultErrorHandler,如果出现反序列化异常,我的代码将进入此块,而无需任何重试尝试,这是预期的行为。但是,问题出在我的 errorHandler 中的 consumerRecord 参数。我没有在 ConsumerRecord 中收到消息,因此消息在 DLT 中保存为 null 。我想以某种方式保留损坏的消息并将其发布到 DLT。
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
alerter.alert(exception,"Message Consumption Failed : " + consumerRecord.value() + " , sent to dead letter topic .");
publisherService.sendMessageToDlq(DLQ_TOPIC, consumerRecord.value());}, fixedBackOff);
consumerRecord.value()为空:(
假设您正在使用
ErrorHandlingDeserializer
,请参阅框架 DeadLetterPublishingRecoverer
中的逻辑,了解它如何从标头(具有原始数据作为属性)获取 DeserializationException
。
抱歉,mb 你找到这个问题的答案了吗?