我有一个以可变速度生成值的 Observable。为了不被值淹没,我添加了三秒的 Throttle,因此只有在三秒内没有发布更多值的情况下我才会获得一个值。但我想要的是,如果我在一段时间内获得一定数量的更新,则结束流并将其替换为另一个可观察的。
例如,如果我在三秒内收到 50 个更新,则结束流并将其替换为不同的流,类似于 Catch 如何将因异常终止的可观察对象替换为另一个可观察对象。
类似于下面的内容,但没有抛出异常,因此不能使用 Catch:
myObservable
.Throttle(TimeSpan.FromSeconds(3)) //Not sure if we need to remove Throttle
.Catch<long, Exception>(e => Observable.Return(long)0) //Instead of catching an exception, some way to monitor how many updates are coming in before throttling
.Subscribe
编辑: 我添加了一个弹珠图来尝试展示我正在寻找的内容。
初始可观察值以可变速率产生值。值 1-6 进来,3 秒内突发 50 个值都没有,这些值传递到节流阀,产生最终值 1、5 和 6。
然后,初始可观察值在 3 秒内生成值 7-60。这是我想做的事情“???”正在显示。这个想法是为了认识到在设定的时间范围内生产了 50 件或更多的物品,完成原始的 obs。并将其替换为我提供的,类似于您提供 obs 的方式。 Catch 中的序列来替换出错的序列(例如,如果我看到原始序列产生了巨大的突发并引发了异常)。
初始 obs 之后。被替换后,序列将继续使用新的序列,生产的物品将通过现有的节流阀。
如果在“???”中检查的时间范围内只有 49 个项目,这些值将全部传递到 Throttle,并且只会生成最后一个。如果根本没有更新,那么什么也不会发生,也不会产生任何输出。
希望我现在问的问题更清楚了。
Scan()
构建最后 50 个项目的滑动窗口(无法使其与 Buffer()
或 Window()
一起使用,但我想这是可能的)。每个项目都带有时间戳。然后,对于每个滑动窗口,检查第一个和最后一个时间戳并检查它们是否太接近。如果是这种情况,您可以切换到另一个可观察的。
Select()
:
IObservable<Tuple<int, DateTime>> sourceWithTimeInfo = source.Select(it =>
{
return Tuple.Create(it, DateTime.UtcNow);
});
然后使用
Scan()
构建滑动窗口:
IObservable<IList<Tuple<int, DateTime>>> bufferedSource = sourceWithTimeInfo.Scan((IList<Tuple<int, DateTime>>)new List<Tuple<int, DateTime>>(),
(acc, it) =>
{
while (acc.Count >= countLimit)
{
acc.RemoveAt(0);
}
acc.Add(it);
return acc;
});
现在您的“值”是一个元组列表,其中每个条目都有值及其发出的时间戳。当开始和结束时间戳太接近时,我们使用
TakeWhile()
停止发出值:
IObservable<IList<Tuple<int, DateTime>>> stopWhenTooMuchSource = bufferedSource.TakeWhile(it =>
{
if (it.Count < countLimit)
{
return true;
}
DateTime firstTime = it.First().Item2;
DateTime lastTime = it.Last().Item2;
TimeSpan timeDiff = lastTime - firstTime;
if (timeDiff < TimeSpan.FromSeconds(timeLimitInSeconds))
{
return false;
}
return true;
});
出于调试目的,我们将打印第一个和最后一个条目的值:
IObservable<IList<Tuple<int, DateTime>>> withDebugging = stopWhenTooMuchSource.Do(it =>
{
Console.WriteLine("Count: "+it.Count);
if (it.Count > 0)
{
DateTime firstTime = it.First().Item2;
DateTime lastTime = it.Last().Item2;
TimeSpan timeDiff = lastTime - firstTime;
Console.WriteLine("Timediff is: "+timeDiff);
}
});
然后我们再次“提取”原始值:
IObservable<int> onlyValueOfLastItem = withDebugging.Select(it => it.Last().Item1);
现在我们有一个流,当时间戳太接近时,它会“死亡”。我们可以使用简单的
Concat()
(或 Switch()
)切换到另一个可观察值:
IObservable<int> concatSource = onlyValueOfLastItem.Concat(Observable.Return(-1));
这是完整的源代码:
static void Main(string[] args)
{
ISubject<int> source = new Subject<int>();
IObservable<Tuple<int, DateTime>> sourceWithTimeInfo = source.Select(it =>
{
return Tuple.Create(it, DateTime.UtcNow);
});
int countLimit = 50;
int timeLimitInSeconds = 3;
IObservable<IList<Tuple<int, DateTime>>> bufferedSource = sourceWithTimeInfo.Scan((IList<Tuple<int, DateTime>>)new List<Tuple<int, DateTime>>(),
(acc, it) =>
{
while (acc.Count >= countLimit)
{
acc.RemoveAt(0);
}
acc.Add(it);
return acc;
});
IObservable<IList<Tuple<int, DateTime>>> stopWhenTooMuchSource = bufferedSource.TakeWhile(it =>
{
if (it.Count < countLimit)
{
return true;
}
DateTime firstTime = it.First().Item2;
DateTime lastTime = it.Last().Item2;
TimeSpan timeDiff = lastTime - firstTime;
if (timeDiff < TimeSpan.FromSeconds(timeLimitInSeconds))
{
return false;
}
return true;
});
IObservable<IList<Tuple<int, DateTime>>> withDebugging = stopWhenTooMuchSource.Do(it =>
{
Console.WriteLine("Count: "+it.Count);
if (it.Count > 0)
{
DateTime firstTime = it.First().Item2;
DateTime lastTime = it.Last().Item2;
TimeSpan timeDiff = lastTime - firstTime;
Console.WriteLine("Timediff is: "+timeDiff);
}
});
IObservable<int> onlyValueOfLastItem = withDebugging
.Select(it => it.Last().Item1);
IObservable<int> concatSource = onlyValueOfLastItem.Concat(Observable.Return(-1));
Console.WriteLine("Subscribe start");
concatSource.Subscribe(it =>
{
Console.WriteLine(it);
});
Thread t = new Thread(() =>
{
int maxDelay = 300;
int counter = 1;
while (maxDelay > 0)
{
Console.WriteLine("maxDelay is: "+maxDelay);
source.OnNext(counter++);
int sleepAmount = Random.Shared.Next(1, maxDelay);
maxDelay--;
Thread.Sleep(sleepAmount);
}
});
t.Start();
t.Join();
Console.WriteLine("Program ends");
}
这可以生成如下输出:
[...]
maxDelay is: 111
Count: 50
Timediff is: 00:00:03.0516947
190
maxDelay is: 110
Count: 50
Timediff is: 00:00:03.0656792
191
maxDelay is: 109
Count: 50
Timediff is: 00:00:03.0892908
192
maxDelay is: 108
Count: 50
Timediff is: 00:00:03.1163132
193
maxDelay is: 107
Count: 50
Timediff is: 00:00:03.1003305
194
maxDelay is: 106
Count: 50
Timediff is: 00:00:03.0078090
195
maxDelay is: 105
-1
[...]
您仍然需要在正确的位置再次读取
Throttle()
调用。