我正在尝试学习 Reactive X Java,并且我真的很难理解错误处理的工作原理。我想要实现的目标很简单:一个Observable生成一个数据流,偶尔可能会遇到错误。我想将错误传递给订阅者,但不取消订阅 Observable,这是使用
onError
的结果。如果发生错误,订阅者只需以他们认为合适的方式记录/处理错误,并维持其订阅。听起来 retry
应该这样做,但我已经尝试了文档中可以找到的所有内容,但无法使重试功能正常工作:我总是最终收到此错误:
Exception in thread "main" io.reactivex.rxjava3.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.Exception: Error: 8
这是我现在所拥有的,重试不起作用:
public static void main(String[] args) {
ConnectableObservable<Object> obs = Observable.create(subscriber -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println("Producer produced data: " + i);
if (i % 2 == 0) {
throw new Exception("Error: " + i);
}
subscriber.onNext(i);
Thread.sleep(500);
} catch (Exception e) {
System.err.println("Producer Error: " + e.getMessage());
subscriber.onError(e);
}
}
}).publish();
obs
.retry(3)
.subscribe(
data -> {
System.out.println("Subscriber 1 received data: " + data);
},
error -> {
System.err.println("Subscriber 1 Error: " + error.getMessage());
},
() -> {
System.out.println("Subscriber 1 Complete");
}
);
obs
.retry(3)
.subscribe(
data -> {
System.out.println("Subscriber 2 received data: " + data);
},
error -> {
System.err.println("Subscriber 2 Error: " + error.getMessage());
},
() -> {
System.out.println("Subscriber 2 Complete");
}
);
obs.connect();
}
问题是,如果发生错误,重试会重试整个可观察序列。
如果你想处理错误并继续发出项目,你应该使用
onErrorResumeNext
.
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.observables.ConnectableObservable;
public class RetryExample {
public static void main(String[] args) {
ConnectableObservable<Object> obs = Observable.create(subscriber -> {
for (int i = 0; i < 10; i++) {
try {
System.out.println("Producer produced data: " + i);
if (i % 2 == 0) {
throw new Exception("Error: " + i);
}
subscriber.onNext(i);
Thread.sleep(500);
} catch (Exception e) {
System.err.println("Producer Error: " + e.getMessage());
subscriber.onError(e);
}
}
}).publish();
obs
.onErrorResumeNext(Observable.empty())
.subscribe(
data -> {
System.out.println("Subscriber 1 received data: " + data);
},
error -> {
System.err.println("Subscriber 1 Error: " + error.getMessage());
},
() -> {
System.out.println("Subscriber 1 Complete");
}
);
obs
.onErrorResumeNext(Observable.empty())
.subscribe(
data -> {
System.out.println("Subscriber 2 received data: " + data);
},
error -> {
System.err.println("Subscriber 2 Error: " + error.getMessage());
},
() -> {
System.out.println("Subscriber 2 Complete");
}
);
obs.connect();
}
}