跳过在 kafka 流拓扑上抛出运行时错误的消息

问题描述 投票:0回答:1

我按照通常的方式创建了一个 Kafka Stream 拓扑。

public Topology buildTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    builder.stream(inputTopic, Consumed.with(...))
        .filter(this::filterEvents)
        .mapValues(this::mapToOtherEvent)
        .to(outputTopic, Produced.with(...));

    return builder.build();
  }

方法

mapToOtherEvent
很少抛出业务逻辑异常,这会停止整个处理。我想知道是否有一种机制可以跳过此失败的业务逻辑错误并继续处理队列中的以下消息。到目前为止我探索过:

  • 实现
    StreamsUncaughtExceptionHandler
    ,它所做的似乎只是处理异常并再次重播消息,陷入循环。
  • 阅读有关
    ProductionExceptionHandler
    DeserializationExceptionHandler
    的内容,但它们似乎没有处理业务逻辑异常。
  • 这个答案仅建议代码解决方案,在我的情况下,如果我实现它们,将会产生其他效果
apache-kafka apache-kafka-streams
1个回答
0
投票

在这种情况下,您可以从

mapValues
切换到
flatMapValues
并更改
mapToOtherEvent
来捕获异常并返回零结果值(即空集合),以有效地吞掉错误。

下一个版本(Apache Kafka 3.9)添加了一个名为

ProcessingExceptionHandler
的新处理程序,它允许您本机跳过此类错误,类似于现有的处理程序。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.