反应式运算符设置超时行为,但仅当返回一定数量的项目时

问题描述 投票:0回答:1

我有一个场景,其中有来自仪器的可观察测量序列,仅当值发生一定量的变化时才会触发测量事件。

底层数据源引发 .NET 事件,而不是

IObservable<T>
流。我使用
IObservable<T>
将事件转换为
Observable.FromEventPattern
,它在订阅可观察对象时订阅事件处理程序,并在处理它时取消订阅它,并且还打开和关闭测量仪器,因此我需要确保我正确管理
Observable.FromEventPattern
返回的可观察对象的生命周期。

// Example of the FromEventPattern, with start/stop lifecycle measurement
// and logic to immediately return the current value when subscribed to
public static IObservable<Double> MeasurementObservable(this MyInstrument instrument)
{
    return Observable.Defer(
      () => Observable
        .FromEventPattern<NewMeasurementEventHandler, Double>(
            h =>
            {
                instrument.NewMeasurementEvent += h;
                instrument.StartMeasuring();
            },
            h =>
            {
                instrument.StopMeasuring();
                instrument.NewMeasurementEvent -= h;
            })
        .Select(eventPattern => eventPattern.EventArgs)
        // Return the current value immediately, then raise on events
        .Prepend(instrument.CurrentValue)
     );
}

我的应用程序的用户需要在应用程序中启动测量,然后手动启动仪器测量结果的过程。我的应用程序没有明确指示正在测量的过程已经实际上开始或结束,但我可以隐含地告诉我,当我开始从仪器获取大量更改事件时,它就开始了,并且我可以知道它已经结束当仪器停止引发测量变化事件时。

因此,我正在尝试编写一个 Reactive 运算符,它将等待,直到从 IObservable 接收到至少 N 测量值(这证明该进程实际上正在运行),并且 then 实现超时逻辑,优雅地完成测量并在停止接收新值时取消订阅底层源(这表明测量过程已完成)。这是我目前的尝试:

public static class ObservableExtensions
{
    /// <summary>
    /// After the Observable has yielded an initial set of items, implement a timeout policy
    /// which completes the Observable if no more items are observed for a specified time.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="source"></param>
    /// <param name="initialItems">The number of items which must be yielded before implementing the timeout policy.</param>
    /// <param name="dueTime">The timeout duration which signals the completion of the stream.</param>
    /// <returns></returns>
    public static IObservable<T> CompleteWhenUpdatesStop<T>(this IObservable<T> source, Int32 initialItems, TimeSpan dueTime)
    {
        var refCounted = source.Publish().RefCount();
        return refCounted.Take(initialItems)
            .Concat(refCounted.Timeout(dueTime, Observable.Empty<T>()));
    }
}

但是,这效果并不好。尽管我尝试使用

Publish().RefCount()
来确保底层
FromEventPattern
observable 在超时触发之前不会取消订阅,但实际发生的情况是
Take()
返回项目,然后处理底层订阅,然后创建新的订阅是对底层 Observable 进行的,但现在配置了超时。在我目前的情况下,这扰乱了事件生命周期管理,但也意味着如果您将此运算符与冷 Observable 一起使用,您可以获得重复的原始值。我也不希望这样 - 消费者应该只看到底层 Observable 发出的每个值一次。

实现超时逻辑的正确方法是什么,以便它仅在我收集了N初始项目后才生效?

c# .net system.reactive reactivex
1个回答
0
投票

看起来我可以通过手动管理底层 Observable 的生命周期来修复/解决生命周期管理问题,如下所示:

    public static IObservable<T> CompleteWhenUpdatesStop<T>(this IObservable<T> source, Int32 initialItems, TimeSpan dueTime)
    {
        return Observable.Defer(() =>
        {
            var published = source.Publish();
            var subs = published.Take(initialItems).Concat(published.Timeout(dueTime, Observable.Empty<T>()));
            var connection = published.Connect();
            return subs.Finally(() => connection.Dispose());
        });
    }

但是我认为这种丑陋,如果有人可以使用反应式运算符建议一个不太有状态的解决方案,我很乐意接受这个答案。我会暂时保留这个问题,看看是否有人有更好的想法。

© www.soinside.com 2019 - 2024. All rights reserved.