我有一个热 Observable (Rx.Net),当它检测到设备与 PC 的连接时,它会发出滴答声。我有一个处理这些设备显示的视图。不幸的是,视图可能要等到设备连接后才会显示,因此会丢失通知。
我想知道是否有办法缓存设备连接直到有人订阅。我想过使用
ReplaySubject
,但是它会不断添加到缓存中,如果在应用程序会话的生命周期内有太多设备连接,那么它并不理想。
Observables 使用延迟执行,并且在被订阅之前不会生成值。
所以,你有两个选择。
(1) 仅在订阅时连接到设备。
IObservable<Unit> query =
Observable
.Using(() => new Resource(), r => r.Values);
Using
操作正是这样做的。 Defer
也是如此。两者都可以用来延迟连接,直到有订阅为止。
(2) 如果设备在您订阅之前生成值,则使用常规的非 Rx 代码来缓存这些值,然后在订阅发生时让 observable 耗尽缓存。
如果您需要的答案不止于此,您需要提供显示您面临的问题的代码,然后我可以编写可能解决您的问题的代码。
这是我用于相同目的的代码。请注意,如果没有订户到达 - 应通过发出“stopMe”参数信号来显式停止它。
行为如下:
开始收集物品直至订阅
订阅后 - 释放收集的物品
如果取消订阅 - 继续收集物品直到下次订阅!
一旦触发 - 应通过 stopMe 参数显式停止。
public static IObservable<T> CollectUntilSubscribed<T, TUntil>(this IObservable<T> source, IObservable<TUntil> stopMe)
{
var subscribersCount = 0;
var ready = new Subject<Unit>();
var delayed = source
.Delay(x => subscribersCount > 0 ? Observable.Return(Unit.Default):ready)
.Publish()
;
var connection = delayed.Connect();
stopMe.Take(1).Subscribe(_ => connection.Dispose());
return Observable.Create<T>(o =>
{
var sub = delayed.Subscribe(o);
if (Interlocked.Increment(ref subscribersCount) == 1) ready.OnNext(Unit.Default);
return Disposable.Create(() =>
{
Interlocked.Decrement(ref subscribersCount);
sub.Dispose();
});
});
}
鉴于视图总是会被显示,人们可能会这样想:
connections.Buffer(view.WhenDisplayed()).Take(1).SelectMany(buffer=>buffer.Concat(connections))