我有一个 Rx 流,它将多个项目发送给观察者。 但我想修改发回的第一个项目。所有其他项目都可以按原样发送。
我知道
map()
会拦截所有项目,但是我必须保留当前正在发出的项目的计数器。
有没有办法只对第一个项目执行此操作?
谢谢。
试试这个
假设你有一个字符串流
["First", "Second", "Third", "Fourth"]
并且你只需要修改第一个项目
Observable<String> stringObservable = Observable.just("First", "Second", "Third", "Fourth").publish().refCount();
stringObservable.skip(1)
.startWith(stringObservable.take(1).map(s -> "Modified"))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
结果将是
Modified
Second
Third
Fourth
这是一个更强大的版本,可以处理上游是热可观察对象的情况。
upstream.publish(stream ->
Observable.concatEager(
List.of(
stream.take(1).map(s -> "Modified"),
stream.skip(1))
)
)
.subscribe(System.out::println);
这样
upstream
就可以很热。例如:
Observable.create(emitter -> {
List<String> values = List.of("First", "Second", "Third", "Fourth");
for (String value : values) {
if (emitter.isDisposed())
break;
emitter.onNext(value);
}
})
.publish(...as above...)