我需要帮助使用 Rx 创建滑动窗口速率限制器

问题描述 投票:0回答:1

使用Rx可以实现在指定时间内只能处理指定数量的速率限制器吗?

例如设置为每秒只处理5次,输入如下:

var input = new Queue<string>();

// I'll use System.Threading.Chaannels.Channel<T> here
input.Enqueue("data1"); // at 0.10s
input.Enqueue("data2"); // at 0.20s
input.Enqueue("data3"); // at 0.30s
input.Enqueue("data4"); // at 0.40s
input.Enqueue("data5"); // at 0.50s
input.Enqueue("data6"); // at 0.60s
input.Enqueue("data7"); // at 0.70s
input.Enqueue("data8"); // at 1.20s
input.Enqueue("data9"); // at 1.30s
input.Enqueue("data10"); // at 1.40s
input.Enqueue("data11"); // at 2.20s
input.Enqueue("data12"); // at 2.50s

// The rate limiter will watch the input queue and call OnNext() when it's time to process the next item
var rateLimiter = Observable.Empty<string>();

// Process the input data
rateLimiter.Subscribe(data => { });

// output:
// processing data1 at 0.10s
// processing data2 at 0.20s
// processing data3 at 0.30s
// processing data4 at 0.40s
// processing data5 at 0.50s
// processing data6 at 1.10s
// processing data7 at 1.20s
// processing data8 at 1.30s
// processing data9 at 1.40s
// processing data10 at 1.50s
// processing data11 at 2.20s
// processing data12 at 2.50s
c# reactive-programming reactivex rx.net
1个回答
0
投票

如果我正确理解您的问题,以下解决方案可能适合您:

public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
    => source
        .Select(p => Observable.Empty<T>().Delay(interval).StartWith(p))
        .Concat();

static void Main()
{
  var fastSequence = Observable
      .Interval(TimeSpan.FromMilliseconds(100))
      .Take(10);

  var rateLimitedSequence = fastSequence
      .Pace(TimeSpan.FromMilliseconds(150));

  rateLimitedSequence
      .TimeInterval()
      .Subscribe(i =>Console.WriteLine(i));


  Console.ReadLine();
}

输出:

0@00:00:00.1296188
1@00:00:00.1624077
2@00:00:00.1538291
3@00:00:00.1559773
4@00:00:00.1563906
5@00:00:00.1559143
6@00:00:00.1563431
7@00:00:00.1563380
8@00:00:00.1560298
9@00:00:00.1563190
© www.soinside.com 2019 - 2024. All rights reserved.