我的问题有点像创建 Nagle 算法 来解决的问题,但不完全一样。我想要的是将来自
OnNext
的 IObservable<T>
通知缓冲到一系列 IObservable<IList<T>>
中,如下所示:
T
通知到达时,将其添加到缓冲区并开始倒计时T
通知,请将其添加到缓冲区 并重新开始倒计时T
通知作为单个聚合 IList<T>
通知转发。IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)
看起来很有希望,但它似乎定期发送聚合通知,而不是执行我想要的“当第一个通知到达时启动计时器并在其他通知到达时重新启动计时器”行为,并且它还会发送如果下面没有生成任何通知,则每个时间窗口末尾会出现一个空列表。
我确实不想删除任何
T
通知;只是缓冲它们。
有这样的东西存在吗,还是我需要自己写?
SO 上存在一些类似的问题,但不完全像这样。 这是一个可以解决问题的扩展方法。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
有趣的运算符。 Supertopi 的答案很好,但还有可以改进的地方。如果
maxAmount
很大,和/或通知率很高,那么使用 Buffer
会通过分配不久后被丢弃的缓冲区来消耗 GC。
为了在达到
GroupBy
后关闭每个 maxAmount
Observable,您不需要捕获所有这些元素的 Buffer
只是为了知道它何时已满。根据Supertopi的回答,您可以将其稍微更改为以下内容。它不是收集 Buffer
的 maxAmount
元素,而是在看到流上的 maxAmount
元素后发出信号。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Take(maxAmount)
.LastAsync()
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
很好的解决方案。在我看来,使用现有运算符创建行为只是为了方便而不是为了性能。
此外,我们应该始终返回 IEnumerable 而不是 IList。 返回最少派生类型 (IEnumerable) 将为您留下最大的余地来更改底层实现。
这是我实现自定义运算符的版本。
public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
{
var buffer = new List<TValue>();
return Observable.Create<IEnumerable<TValue>>(observer =>
{
var aTimer = new Timer();
void Clear()
{
aTimer.Stop();
buffer.Clear();
}
void OnNext()
{
observer.OnNext(buffer);
Clear();
}
aTimer.Interval = threshold.TotalMilliseconds;
aTimer.Enabled = true;
aTimer.Elapsed += (sender, args) => OnNext();
var subscription = @this.Subscribe(value =>
{
buffer.Add(value);
if (buffer.Count >= maxAmount)
OnNext();
else
{
aTimer.Stop();
aTimer.Start();
}
});
return Disposable.Create(() =>
{
Clear();
subscription.Dispose();
});
});
}
通过测试与其他解决方案相比的性能,它可以节省高达 30% 的 CPU 功耗并解决内存问题。
在 Ian Griffiths 在 site 上发布的有关 rx 的文章中,LINQ 运算符和组合 部分中有一个实现此类运算符的示例。那里还给出了他的工作的详细描述。
static class RxExt
{
public static IObservable<IList<T>> Quiescent<T>( this IObservable<T> src,
TimeSpan minimumInactivityPeriod, IScheduler scheduler) {
IObservable<int> onoffs =
from _ in src
from delta in
Observable.Return(1, scheduler)
.Concat(Observable.Return(-1, scheduler)
.Delay(minimumInactivityPeriod, scheduler))
select delta;
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
}