如何使用 java-rx 在 Observable 中重试错误?

问题描述 投票:0回答:1

我正在尝试学习 Reactive X Java,并且我真的很难理解错误处理的工作原理。我想要实现的目标很简单:一个Observable生成一个数据流,偶尔可能会遇到错误。我想将错误传递给订阅者,但不取消订阅 Observable,这是使用

onError
的结果。如果发生错误,订阅者只需以他们认为合适的方式记录/处理错误,并维持其订阅。听起来
retry
应该这样做,但我已经尝试了文档中可以找到的所有内容,但无法使重试功能正常工作:我总是最终收到此错误:

Exception in thread "main" io.reactivex.rxjava3.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.Exception: Error: 8

这是我现在所拥有的,重试不起作用:

public static void main(String[] args) {
    ConnectableObservable<Object> obs = Observable.create(subscriber -> {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("Producer produced data: " + i);
                if (i % 2 == 0) {
                    throw new Exception("Error: " + i);
                }
                subscriber.onNext(i);
                Thread.sleep(500);
            } catch (Exception e) {
                System.err.println("Producer Error: " + e.getMessage());
                subscriber.onError(e);
            }
        }
    }).publish();

    obs
            .retry(3)
            .subscribe(
                    data -> {
                        System.out.println("Subscriber 1 received data: " + data);
                    },
                    error -> {
                        System.err.println("Subscriber 1 Error: " + error.getMessage());
                    },
                    () -> {
                        System.out.println("Subscriber 1 Complete");
                    }
            );

    obs
            .retry(3)
            .subscribe(
                    data -> {
                        System.out.println("Subscriber 2 received data: " + data);
                    },
                    error -> {
                        System.err.println("Subscriber 2 Error: " + error.getMessage());
                    },
                    () -> {
                        System.out.println("Subscriber 2 Complete");
                    }
            );

    obs.connect();
}
java rx-java
1个回答
0
投票

问题是,如果发生错误,重试会重试整个可观察序列。

如果你想处理错误并继续发出项目,你应该使用

onErrorResumeNext
.

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.observables.ConnectableObservable;

public class RetryExample {
    public static void main(String[] args) {
        ConnectableObservable<Object> obs = Observable.create(subscriber -> {
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("Producer produced data: " + i);
                    if (i % 2 == 0) {
                        throw new Exception("Error: " + i);
                    }
                    subscriber.onNext(i);
                    Thread.sleep(500);
                } catch (Exception e) {
                    System.err.println("Producer Error: " + e.getMessage());
                    subscriber.onError(e);
                }
            }
        }).publish();

        obs
            .onErrorResumeNext(Observable.empty())
            .subscribe(
                data -> {
                    System.out.println("Subscriber 1 received data: " + data);
                },
                error -> {
                    System.err.println("Subscriber 1 Error: " + error.getMessage());
                },
                () -> {
                    System.out.println("Subscriber 1 Complete");
                }
            );

        obs
            .onErrorResumeNext(Observable.empty())
            .subscribe(
                data -> {
                    System.out.println("Subscriber 2 received data: " + data);
                },
                error -> {
                    System.err.println("Subscriber 2 Error: " + error.getMessage());
                },
                () -> {
                    System.out.println("Subscriber 2 Complete");
                }
            );

        obs.connect();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.