(Rx.Net) 缓存数据直到 IObserver 订阅

问题描述 投票:0回答:3

我有一个热 Observable (Rx.Net),当它检测到设备与 PC 的连接时,它会发出滴答声。我有一个处理这些设备显示的视图。不幸的是,视图可能要等到设备连接后才会显示,因此会丢失通知。

我想知道是否有办法缓存设备连接直到有人订阅。我想过使用

ReplaySubject
,但是它会不断添加到缓存中,如果在应用程序会话的生命周期内有太多设备连接,那么它并不理想。

system.reactive
3个回答
0
投票

Observables 使用延迟执行,并且在被订阅之前不会生成值。

所以,你有两个选择。

(1) 仅在订阅时连接到设备。

IObservable<Unit> query =
    Observable
        .Using(() => new Resource(), r => r.Values);

Using
操作正是这样做的。
Defer
也是如此。两者都可以用来延迟连接,直到有订阅为止。

(2) 如果设备在您订阅之前生成值,则使用常规的非 Rx 代码来缓存这些值,然后在订阅发生时让 observable 耗尽缓存。

如果您需要的答案不止于此,您需要提供显示您面临的问题的代码,然后我可以编写可能解决您的问题的代码。


0
投票

这是我用于相同目的的代码。请注意,如果没有订户到达 - 应通过发出“stopMe”参数信号来显式停止它。

行为如下:

  1. 开始收集物品直至订阅

  2. 订阅后 - 释放收集的物品

  3. 如果取消订阅 - 继续收集物品直到下次订阅!

  4. 一旦触发 - 应通过 stopMe 参数显式停止。

    public static IObservable<T> CollectUntilSubscribed<T, TUntil>(this IObservable<T> source, IObservable<TUntil> stopMe)
    {
     var subscribersCount = 0;
    
     var ready = new Subject<Unit>();
    
     var delayed = source
         .Delay(x => subscribersCount > 0 ? Observable.Return(Unit.Default):ready)
         .Publish()
         ;
    
     var connection = delayed.Connect();
    
     stopMe.Take(1).Subscribe(_ => connection.Dispose());
    
     return Observable.Create<T>(o =>
     {
         var sub = delayed.Subscribe(o);
    
         if (Interlocked.Increment(ref subscribersCount) == 1) ready.OnNext(Unit.Default);
    
         return Disposable.Create(() =>
         {
             Interlocked.Decrement(ref subscribersCount);
             sub.Dispose();
         });
     });
    }
    

-1
投票

鉴于视图总是会被显示,人们可能会这样想:

connections.Buffer(view.WhenDisplayed()).Take(1).SelectMany(buffer=>buffer.Concat(connections))
© www.soinside.com 2019 - 2024. All rights reserved.