如何处理spring-kafka中批量监听器的验证和转换失败

问题描述 投票:0回答:1

我们使用来自 spring-kafka 的批处理侦听器以及自动解析为 Java dtos 的 JSON 负载,因此我们的侦听器方法目前看起来像这样:

@KafkaListener(topics = MY_TOPIC, containerFactory = "kafkaListenerContainerFactoryWithBatching",
               properties = {"max.poll.records=500"})
public void failingBatchReceiver(@Payload List<Dto> messages) {
    // ...
}

我们的 dtos 使用 JSR 303 bean 验证注释进行注释,我们希望记录并忽略任何无效消息。

对于记录监听器,我们可以将

@Valid
放在
@Payload
上,但这意味着如果整个批次包含一条无效消息,我们将拒绝该批次。

问题 1:有没有比注入

Validator
、验证每条消息、记录错误并忽略每个侦听器中处理的消息更简单的方法?


另一个问题是转换失败:如果我们传入无效的 JSON(例如意外格式的日期),spring-kafka 将传入

null
消息而不记录任何内容。这出乎我们的意料。我们现在意识到原始异常在
CONVERSION_FAILURES
标头中可用。同样,我们需要在每个侦听器中注入此标头。如果我们注意到
null
有效负载,我们会检查标头列表的索引,然后我们可以记录错误。为了进行调查,我们还想记录消息的主题、分区和偏移量,因此我们还必须将这些标头注入每个侦听器中。

问题2:有没有更简单的方法让这些转换失败可见?

问题 3:是否可以将所有标头注入一个参数中,而不是像 https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation 所示为每个标头使用一个参数.html#batch-listeners 就像我们可以用

ConsumerRecordMetadata
来录制监听器一样? 这将使将有效负载和标头传递给某些辅助方法变得更简单,我们可以使用这些方法来执行所有这些检查、记录错误并删除无效消息。

spring spring-kafka
1个回答
0
投票

问题 1:是否有比注入验证器、验证每条消息、记录错误并忽略每个侦听器中处理的消息更简单的方法?

还是没找到。

问题2:有没有更简单的方法让这些转换失败可见?

创建https://github.com/spring-projects/spring-kafka/issues/3555,但我们可能只会处理批处理程序中的错误(提取到通用帮助程序)。

问题 3:是否可以将所有标头注入一个参数中,而不是像 https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation 所示为每个标头使用一个参数.html#batch-listeners 就像我们可以使用 ConsumerRecordMetadata 来记录监听器一样?这将使将有效负载和标头传递给某些辅助方法变得更简单,我们可以使用这些方法来执行所有这些检查、记录错误并删除无效消息。

是的,我们也可以使用批处理侦听器注入

@Headers Map<String, Object> headers

© www.soinside.com 2019 - 2024. All rights reserved.