我按照通常的方式创建了一个 Kafka Stream 拓扑。
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(...))
.filter(this::filterEvents)
.mapValues(this::mapToOtherEvent)
.to(outputTopic, Produced.with(...));
return builder.build();
}
方法
mapToOtherEvent
很少抛出业务逻辑异常,这会停止整个处理。我想知道是否有一种机制可以跳过此失败的业务逻辑错误并继续处理队列中的以下消息。到目前为止我探索过:
StreamsUncaughtExceptionHandler
,它所做的似乎只是处理异常并再次重播消息,陷入循环。ProductionExceptionHandler
和 DeserializationExceptionHandler
的内容,但它们似乎没有处理业务逻辑异常。在这种情况下,您可以从
mapValues
切换到 flatMapValues
并更改 mapToOtherEvent
来捕获异常并返回零结果值(即空集合),以有效地吞掉错误。
下一个版本(Apache Kafka 3.9)添加了一个名为
ProcessingExceptionHandler
的新处理程序,它允许您本机跳过此类错误,类似于现有的处理程序。