我有点困惑。
问题在于通过重试创建反应式调用链。它进入步骤 3,而不保存步骤 2 中重试的值。如果我删除 onErrorContinue,它将成功重试保存,但我错过了“onErrorContinue”功能。
我做什么:
仅举个例子
Flux.fromIterable(s) // actually from kafka
.flatMap(kk -> eventService.save(kk)
.doOnNext(it -> record.receiverOffset().acknowledge()))
.onErrorContinue((ex, record) -> {
log.error("Exception during consumer", ex);
if (record instanceof ReceiverRecord failedRecord &&
ex instanceof DeserializationException exception) {
log.error("Failed record details {} and exception {}", record, exception);
failedRecord.receiverOffset().acknowledge();
} else {
log.error("Some unexpected error type! Cant move offset", ex);
}
})
.doOnError(e -> log.error("Exception during consumer", e))
.retry()
.subscribe();
eventService.save 看起来像。它按预期工作(异常时重试)
public Mono<SomeModel> save(SomeModel someModel) {
return Mono.defer(() -> repository.save(someModel))
.retryWhen(Retry
.backoff(1, Duration.ofMillis(100))
.maxBackoff(Duration.ofMillis(5000)));
}
主要冲突在于
retry()
和 onErrorContinue()
运算符之间。
retry()
的目的是在发生错误时重复整个操作,但 onErrorContinue()
会跳过错误而不重试它们。
尝试以下代码片段;
Flux.fromIterable(s) // assuming this is from Kafka
.flatMap(kk -> eventService.save(kk)
.retryWhen(Retry.backoff(3, Duration.ofMillis(100)) // Retry logic
.filter(ex -> ex instanceof SpecificRetryableException)) // Define retryable exceptions
.doOnNext(it -> record.receiverOffset().acknowledge())
.onErrorResume(ex -> { // Handle non-retryable exceptions (skip the record)
log.error("Skipping record due to error", ex);
return Mono.empty(); // Skips the record on error
})
)
.onErrorContinue((ex, record) -> {
log.error("Exception during consumer", ex);
if (record instanceof ReceiverRecord failedRecord && ex instanceof DeserializationException exception) {
log.error("Failed record details {} and exception {}", record, exception);
failedRecord.receiverOffset().acknowledge(); // Skip the record and move the offset
} else {
log.error("Some unexpected error type! Can't move offset", ex);
}
})
.doOnError(e -> log.error("Exception during consumer", e))
.subscribe();
filter()
来定位可重试错误