我有一个场景,其中有来自仪器的可观察测量序列,仅当值发生一定量的变化时才会触发测量事件。
底层数据源引发 .NET 事件,而不是
IObservable<T>
流。我使用 IObservable<T>
将事件转换为 Observable.FromEventPattern
,它在订阅可观察对象时订阅事件处理程序,并在处理它时取消订阅它,并且还打开和关闭测量仪器,因此我需要确保我正确管理 Observable.FromEventPattern
返回的可观察对象的生命周期。
// Example of the FromEventPattern, with start/stop lifecycle measurement
// and logic to immediately return the current value when subscribed to
public static IObservable<Double> MeasurementObservable(this MyInstrument instrument)
{
return Observable.Defer(
() => Observable
.FromEventPattern<NewMeasurementEventHandler, Double>(
h =>
{
instrument.NewMeasurementEvent += h;
instrument.StartMeasuring();
},
h =>
{
instrument.StopMeasuring();
instrument.NewMeasurementEvent -= h;
})
.Select(eventPattern => eventPattern.EventArgs)
// Return the current value immediately, then raise on events
.Prepend(instrument.CurrentValue)
);
}
我的应用程序的用户需要在应用程序中启动测量,然后手动启动仪器测量结果的过程。我的应用程序没有明确指示正在测量的过程已经实际上开始或结束,但我可以隐含地告诉我,当我开始从仪器获取大量更改事件时,它就开始了,并且我可以知道它已经结束当仪器停止引发测量变化事件时。
因此,我正在尝试编写一个 Reactive 运算符,它将等待,直到从 IObservable 接收到至少 N 测量值(这证明该进程实际上正在运行),并且 then 实现超时逻辑,优雅地完成测量并在停止接收新值时取消订阅底层源(这表明测量过程已完成)。这是我目前的尝试:
public static class ObservableExtensions
{
/// <summary>
/// After the Observable has yielded an initial set of items, implement a timeout policy
/// which completes the Observable if no more items are observed for a specified time.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source"></param>
/// <param name="initialItems">The number of items which must be yielded before implementing the timeout policy.</param>
/// <param name="dueTime">The timeout duration which signals the completion of the stream.</param>
/// <returns></returns>
public static IObservable<T> CompleteWhenUpdatesStop<T>(this IObservable<T> source, Int32 initialItems, TimeSpan dueTime)
{
var refCounted = source.Publish().RefCount();
return refCounted.Take(initialItems)
.Concat(refCounted.Timeout(dueTime, Observable.Empty<T>()));
}
}
但是,这效果并不好。尽管我尝试使用
Publish().RefCount()
来确保底层 FromEventPattern
observable 在超时触发之前不会取消订阅,但实际发生的情况是 Take()
返回项目,然后处理底层订阅,然后创建新的订阅是对底层 Observable 进行的,但现在配置了超时。在我目前的情况下,这扰乱了事件生命周期管理,但也意味着如果您将此运算符与冷 Observable 一起使用,您可以获得重复的原始值。我也不希望这样 - 消费者应该只看到底层 Observable 发出的每个值一次。
实现超时逻辑的正确方法是什么,以便它仅在我收集了N初始项目后才生效?
看起来我可以通过手动管理底层 Observable 的生命周期来修复/解决生命周期管理问题,如下所示:
public static IObservable<T> CompleteWhenUpdatesStop<T>(this IObservable<T> source, Int32 initialItems, TimeSpan dueTime)
{
return Observable.Defer(() =>
{
var published = source.Publish();
var subs = published.Take(initialItems).Concat(published.Timeout(dueTime, Observable.Empty<T>()));
var connection = published.Connect();
return subs.Finally(() => connection.Dispose());
});
}
但是我认为这种丑陋,如果有人可以使用反应式运算符建议一个不太有状态的解决方案,我很乐意接受这个答案。我会暂时保留这个问题,看看是否有人有更好的想法。