我想要实现的目标可以描述如下:
Where
运算符)Sample
运算符应用于原始流概念看起来像这样:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var s = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().AutoConnect();
var s1 = s.Where(x => x % 5 == 0);
var s2 = s.Sample(TimeSpan.FromMilliseconds(1000));
new[] {s1, s2}.Merge()./*Distinct().*/Subscribe(Console.WriteLine, cts.Token);
await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
原始来源很热。如果没有Distinct
,我显然会得到重复的值,它看起来会产生我期望看到的结果。
有没有更好的方法,事实是,第一个派生流不是周期性的?
您可以将索引附加到源可观察对象中,然后将DistinctUntilChanged
应用到最终合并的可观察对象中。
var withIndex = s.Select((x, i) => (Item : x, Index : i));
var s1 = withIndex.Where(p => p.Item % 5 == 0);
var s2 = withIndex.Sample(TimeSpan.FromMilliseconds(1000));
new[] { s1, s2 }
.Merge()
.DistinctUntilChanged(p => p.Index) // discard duplicates
.Select(p => p.Item) // discard the index
.Subscribe(Console.WriteLine, cts.Token);
我想运算符DistinctUntilChanged
比Distinct
更轻巧,因为它仅缓存最新的元素。