好吧,虽然已经很晚了,但我还是无法解决为什么会发生以下情况。
我试图测试以下内容(简化版)。IConnectableObservable<long>
:
private const int PollingIntervalMinutes = 5;
private IConnectableObservable<long> CreateObservable(IScheduler scheduler)
{
return Observable
.Interval(TimeSpan.FromMinutes(PollingIntervalMinutes), scheduler)
.StartWith(0)
.Publish();
}
如果我按照以下方式进行 "长手 "测试,则测试通过。
[Test]
public void ShouldReturnExpectedNumberOfMessagesLongHand()
{
var scheduler = new TestScheduler();
var observed = scheduler.CreateObserver<long>();
var observable = CreateObservable(scheduler);
observable.Subscribe(observed);
observable.Connect();
Assert.That(observed.Messages.Count, Is.EqualTo(1));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(2));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(3));
scheduler.AdvanceBy(TimeSpan.FromMinutes(PollingIntervalMinutes).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(4));
}
但是,如果我用 TestScheduler.Start
办法--如下--测试挂起,永远不会达到。Assert
:
[Test]
public void ShouldReturnExpectedNumberOfMessages()
{
var scheduler = new TestScheduler();
var observable = CreateObservable(scheduler);
var observed = scheduler.Start(() => { observable.Connect(); return observable; }, TimeSpan.FromMinutes(PollingIntervalMinutes * 3).Ticks);
Assert.That(observed.Messages.Count, Is.EqualTo(4));
}
通过在可观察到的情况下放置一个断点(即在另一个断点上)。Select
或 Do
)我可以看到,呼吁 scheduler.Start
导致底层的观测值旋转(即数千次点击断点),而不是尊重预定的时间。
我已经尝试了各种不同的方法来调用 Connect
在...上 IConnectableObservable
(即在调用开始前连接,在TestScheduler中安排调用连接等),但没有用。
这肯定与测试一个IConnectableObservable有关,因为在测试中删除了 Publish
(即使其成为正常的冷门观察对象)使测试通过。
如果您能给我一个理智的检查和建议,我将非常感激。
未处理的出版商 再次打击。
通常的嫌疑人。
var observable = CreateObservable(scheduler);
scheduler.Start(() => { observable.Connect(); return observable; }, ...
要想真正处理掉间隔计时器, 你需要有办法处理掉订阅的问题 observable.Connect()
而不是由政府认购。Start
方法。
一旦你连接,你的间隔就会使用测试调度器(以最快的速度)调出项目,而退订实际上并没有做任何事情,让它运行--而测试调度器永远不会完成。
一般来说,确保资源处置的一种方法是使用 Using
.
scheduler.Start(() => Observable.Using(() => observable.Connect(), _ => observable), ...
但是,一个更简单的方法是,当下游的观测值被取消订阅时,确保原始的发布连接被处置,使用 RefCount
.
scheduler.Start(() => CreateObservable(scheduler).RefCount(), ...