我有一个冷的观察者和一个观察者。两者都很慢,但是观察者比可观察者慢。他们处理许多通知,因此我不想无限制地存储通知。
此示例大约需要30秒才能完成。非常慢。我相信他们可以在21秒内完成。
var subject = new Subject<int>();
subject.Subscribe(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
Task.Run(() =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
foreach (var i in Enumerable.Range(0, 10))
{
Thread.Sleep(1000);
subject.OnNext(i);
}
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});
此示例大约在20秒内完成,但是在观察者显示“ 4”之前,可观察到的结束。它指示调度程序将4到9存储在某个位置。我担心它是否存储超过1'000'000通知并抛出OutOfMemoryException。
var subject = new Subject<int>();
subject.ObserveOn(ThreadPoolScheduler.Instance).Subscribe(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
Task.Run(() =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
foreach (var i in Enumerable.Range(0, 10))
{
Thread.Sleep(1000);
subject.OnNext(i);
}
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});
这就是为什么我想限制调度程序中的通知数量。
编辑:图表
x : calculate or other task
S : send notification
R : revive notification
-- time -->
sample1
thread1: xxxS xxxS xxxS
thread2: Rxxxxxx Rxxxxxx Rxxxxxx
sample2
thread1: xxxSxxxSxxxSxxxSxxxSxxxSxxxS
thread2: RxxxxxxRxxxxxxRxxxxxxRxxxxxx
I want
thread1: xxxSxxxS xxxS xxxS xxxS
thread2: RxxxxxxRxxxxxxRxxxxxxRxxxxxx
Rx合同要求通知必须序列化,因此,即使您可以指定Scheduler
,它也更像是说“在这里,使用此调度程序来管理并发性。”
ThreadPoolScheduler
仍将序列化通知,因此最终结果是它不会并行调用您的方法。
如果要异步执行,可以将其重写为:
subject.Subscribe(async i =>
{
await Task.Delay(1000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
但是潜在的问题是您的消费者落后于生产者。您可以考虑使用backpressure,或者如果您的应用程序是一系列数据处理任务,则还可以考虑使用出色的TPL数据流。
事件源与其最终接收器之间的管道中发生了什么,这是Rx发挥最大作用的地方。