我试图得到给定的Observable
的最新值,并在它被调用后立即发出它。以下面的代码为例:
return Observable.just(myObservable.last())
.flatMap(myObservable1 -> {
return myObservable1;
})
.map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object
这不起作用,因为通过这样做,flatMap
将发射myObservable1
,反过来将发射到达map
。我不知道是否可以做这样的事情。有没有人知道如何实现这一目标?谢谢
last()
方法在这里没有任何帮助,因为它等待Observable终止给你最后一个项目。
假设您没有对发射的observable进行控制,您可以简单地创建一个BehaviorSubject
并将其订阅到发出您想要侦听的数据的observable,然后订阅创建的主题。由于Subject
既是Observable
又是Subscriber
,你会得到你想要的。
我认为(现在没有时间检查)你可能必须手动取消订阅原始的observable作为BehaviorSubject
,一旦他的所有订阅者取消订阅都不会自动取消订阅。
像这样的东西:
BehaviorSubject subject = new BehaviorSubject();
hotObservable.subscribe(subject);
subject.subscribe(thing -> {
// Here just after subscribing
// you will receive the last emitted item, if there was any.
// You can also always supply the first item to the behavior subject
});
http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html
在RxJava中,subscriber.onXXX被称为异步。这意味着如果您的Observable在新线程中发出项目,则在返回之前永远不能获取最后一项,除非您阻止该线程并等待该项目。但是如果Observable同步发出项目并且你不会通过subscribeOn和observOn改变它的线程,例如代码:
Observable.just(1,2,3).subscribe();
在这种情况下,您可以通过以下方式获取最后一项:
Integer getLast(Observable<Integer> o){
final int[] ret = new int[1];
Observable.last().subscribe(i -> ret[0] = i);
return ret[0];
}
这样做是个坏主意.RxJava更喜欢你做它的异步工作。
你真正想要实现的是获取异步任务并将其转换为同步任务。
有几种方法可以实现它,每种方法都有它的优点和缺点:
Observable<T> getData();
然后立即获得最后一个值的方法将如下所示:public T getLastItem(){
return getData().toBlocking().first();
}
请不要使用last(),因为它将等待流完成,然后才会发出最后一项。
如果您的流是一个网络请求,但它没有得到任何项目,这将阻止您的线程!,所以只有在您确定有一个项目可用时(或者如果您真的想要一个块...)时才使用它!
如果您在请求时没有发送任何项目,您将获得null或您设置的任何初始值。这个approch的问题是订阅阶段可能在get之后发生,如果在2个不同的线程中使用,可能会产生竞争条件。