Reactor 跳过步骤而不重试

问题描述 投票:0回答:1

我有点困惑。

问题在于通过重试创建反应式调用链。它进入步骤 3,而不保存步骤 2 中重试的值。如果我删除 onErrorContinue,它将成功重试保存,但我错过了“onErrorContinue”功能。

我做什么:

  1. 输入:通量
  2. 在 flatMap 中调用存储库保存模型,并在异常时重试
  3. 如果 SomeModel 在第 2 步失败,我需要跳过它。 (实际上它是反应式kafka监听器)

仅举个例子

            Flux.fromIterable(s) // actually from kafka
                    .flatMap(kk -> eventService.save(kk)
                        .doOnNext(it -> record.receiverOffset().acknowledge()))                
            .onErrorContinue((ex, record) -> {
                log.error("Exception during consumer", ex);
                if (record instanceof ReceiverRecord failedRecord &&
                        ex instanceof DeserializationException exception) {
                    log.error("Failed record details {} and exception {}", record, exception);
                    failedRecord.receiverOffset().acknowledge();
                } else {
                    log.error("Some unexpected error type! Cant move offset", ex);
                }
            })
            .doOnError(e -> log.error("Exception during consumer", e))
            .retry()
            .subscribe();

eventService.save 看起来像。它按预期工作(异常时重试)

public Mono<SomeModel> save(SomeModel someModel) {
    return Mono.defer(() -> repository.save(someModel))
            .retryWhen(Retry
                    .backoff(1, Duration.ofMillis(100))
                    .maxBackoff(Duration.ofMillis(5000)));

}
spring-webflux project-reactor
1个回答
0
投票

主要冲突在于

retry()
onErrorContinue()
运算符之间。
retry()
的目的是在发生错误时重复整个操作,但
onErrorContinue()
会跳过错误而不重试它们。

尝试以下代码片段;


Flux.fromIterable(s) // assuming this is from Kafka
    .flatMap(kk -> eventService.save(kk)
        .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) // Retry logic
            .filter(ex -> ex instanceof SpecificRetryableException)) // Define retryable exceptions
        .doOnNext(it -> record.receiverOffset().acknowledge())
        .onErrorResume(ex -> { // Handle non-retryable exceptions (skip the record)
            log.error("Skipping record due to error", ex);
            return Mono.empty(); // Skips the record on error
        })
    )
    .onErrorContinue((ex, record) -> {
        log.error("Exception during consumer", ex);
        if (record instanceof ReceiverRecord failedRecord && ex instanceof DeserializationException exception) {
            log.error("Failed record details {} and exception {}", record, exception);
            failedRecord.receiverOffset().acknowledge(); // Skip the record and move the offset
        } else {
            log.error("Some unexpected error type! Can't move offset", ex);
        }
    })
    .doOnError(e -> log.error("Exception during consumer", e))
    .subscribe();


  • 重试逻辑:在 flatmap 中定义,允许使用 retrywhen() 重试特定异常,并使用自定义
    filter()
    来定位可重试错误
© www.soinside.com 2019 - 2024. All rights reserved.