使用Rx可以实现在指定时间内只能处理指定数量的速率限制器吗?
例如设置为每秒只处理5次,输入如下:
var input = new Queue<string>();
// I'll use System.Threading.Chaannels.Channel<T> here
input.Enqueue("data1"); // at 0.10s
input.Enqueue("data2"); // at 0.20s
input.Enqueue("data3"); // at 0.30s
input.Enqueue("data4"); // at 0.40s
input.Enqueue("data5"); // at 0.50s
input.Enqueue("data6"); // at 0.60s
input.Enqueue("data7"); // at 0.70s
input.Enqueue("data8"); // at 1.20s
input.Enqueue("data9"); // at 1.30s
input.Enqueue("data10"); // at 1.40s
input.Enqueue("data11"); // at 2.20s
input.Enqueue("data12"); // at 2.50s
// The rate limiter will watch the input queue and call OnNext() when it's time to process the next item
var rateLimiter = Observable.Empty<string>();
// Process the input data
rateLimiter.Subscribe(data => { });
// output:
// processing data1 at 0.10s
// processing data2 at 0.20s
// processing data3 at 0.30s
// processing data4 at 0.40s
// processing data5 at 0.50s
// processing data6 at 1.10s
// processing data7 at 1.20s
// processing data8 at 1.30s
// processing data9 at 1.40s
// processing data10 at 1.50s
// processing data11 at 2.20s
// processing data12 at 2.50s
如果我正确理解您的问题,以下解决方案可能适合您:
public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
=> source
.Select(p => Observable.Empty<T>().Delay(interval).StartWith(p))
.Concat();
static void Main()
{
var fastSequence = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Take(10);
var rateLimitedSequence = fastSequence
.Pace(TimeSpan.FromMilliseconds(150));
rateLimitedSequence
.TimeInterval()
.Subscribe(i =>Console.WriteLine(i));
Console.ReadLine();
}
输出:
0@00:00:00.1296188
1@00:00:00.1624077
2@00:00:00.1538291
3@00:00:00.1559773
4@00:00:00.1563906
5@00:00:00.1559143
6@00:00:00.1563431
7@00:00:00.1563380
8@00:00:00.1560298
9@00:00:00.1563190