RxJava专家,这是您发光的机会!
您是否可以通过仅更改IllegalStateException
方法中以Flowable.generate()
开头的RxJava管道来确保以下程序不会抛出main()
?
class ExportJob {
private static Scheduler singleThread(String threadName) {
return Schedulers.from(newFixedThreadPool(1, r -> {
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
}));
}
public static void main(String[] args) {
Scheduler genSched = singleThread("genThread");
Scheduler mapSched = singleThread("mapThread");
// execute on "genThread"
Flowable.generate(ExportJob::infiniteGenerator)
.subscribeOn(genSched, false)
// execute on "mapThread"
.observeOn(mapSched, false)
.concatMapMaybe(ExportJob::mapping)
// execute on the thread that creates the pipeline, block it until finished
.blockingForEach(ExportJob::terminal);
}
private static int nb;
/** Must execute on "genThread" thread. */
private static void infiniteGenerator(Emitter<Integer> emitter) {
print(nb, "infiniteGenerator");
emitter.onNext(nb++);
checkCurrentThread("genThread");
}
/** Must execute on "mapThread" thread. */
private static Maybe<Integer> mapping(Integer s) {
print(s, "mapping");
checkCurrentThread("mapThread");
return Maybe.just(s);
}
/** Must execute on "terminal" thread. */
private static void terminal(Integer s) {
print(s, "terminal");
checkCurrentThread("main");
}
private static void print(int item, String method) {
System.out.format("%d - %s - %s()%n", item, Thread.currentThread().getName(), method);
}
private static void checkCurrentThread(String expectedThreadName) throws IllegalStateException {
String name = Thread.currentThread().getName();
if (!name.equals(expectedThreadName)) {
throw new IllegalStateException("Thread changed from '" + expectedThreadName + "' to '" + name + "'");
}
}
}