我正在尝试实现动态 Kafka 异常处理,特别是在某些条件下重试,但找不到任何方法来过滤不应该仅由其类重试的异常。我的错误处理程序代码如下所示:
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(consumerRecord, e) -> {
log.error("Exception happened for {}", consumerRecord, e);
},
exponentialBackOff);
errorHandler.addNotRetryableExceptions(NotRetryableException.class);
但我想更精细地控制什么是不可重试的异常:
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(consumerRecord, e) -> {
log.error("Exception happened for {}", consumerRecord, e);
},
exponentialBackOff);
errorHandler.addNotRetryableExceptions(NotRetryableException.class);
errorHandler.addNotRetryableExceptionFilter(throwable -> throwable.getMessage().startsWith("foo"));
我在标准实现中找不到这样的功能。我检查了来源并发现 BinaryExceptionClassifier#classify 它检查致命/非致命异常,但也许自定义它并不是处理它的最佳方法。
有什么方法可以有效地使用过滤器来检查期望是否可重试?
考虑使用该错误处理程序的 API:
/**
* Set a function to dynamically determine the {@link BackOff} to use, based on the
* consumer record and/or exception. If null is returned, the default BackOff will be
* used.
* @param backOffFunction the function.
* @since 2.6
*/
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#default-eh