有谁知道合并运算符是如何实现的?我惊讶地发现 Merge 运算符可以正确合并冷可观测值:
var odd = new int[] { 1, 3 }.ToObservable().Trace("odd");
var even = new int[] { 2, 4 }.ToObservable().Trace("even");
odd.Merge(even).Dump("Merged");
输出:
odd1: Subscribe()
even1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: OnNext(2)
Merged on 1 -->2
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(4)
Merged on 1 -->4
odd1: OnCompleted()
odd1: Dispose()
even1: OnCompleted()
even1: Dispose()
Merged on 1 completed
我很快就发布了 ToObservable() 调用使用 Scheduler.CurrentThread 来完成其工作,从而允许发生“协作调度”。我使用对 Scheduler.CurrentThread.Schedule() 的递归调用为奇数构建了自己的可观察实现。这使得 Merge 运算符的行为与新的 int[] { 1, 3, 5}.ToObservable() observable 的行为相同。到目前为止,一切都很好。我现在试图弄清楚合并运算符如何在幕后工作以允许协作调度。 我编写了自己版本的合并运算符来尝试帮助我的理解。我尝试过以两种不同的方式安排对可观察量的订阅调用,但都没有复制原始的合并行为。
public class MergedObservable<T> : IObservable<T>
{
private readonly IObservable<T> mSource;
private readonly IObservable<T> mSecond;
public IDisposable Subscribe(IObserver<T> observer)
{
Scheduler.CurrentThread.Schedule(() =>
{
mSource.Subscribe(internalObserver);
Scheduler.CurrentThread.Schedule(() =>
{
mSecond.Subscribe(internalObserver);
});
});
}
}
odd1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: Subscribe()
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(2)
Merged on 1 -->2
odd1: OnCompleted()
odd1: Dispose()
even1: OnNext(4)
Merged on 1 -->4
even1: OnCompleted()
Merged on 1 completed
even1: Dispose()
或者当使用scheduleAsync时我有:
Scheduler.CurrentThread.ScheduleAsync(async (s, t) =>
{
retVal.Add(mSource.Subscribe(internalObserver));
await s.Yield();
retVal.Add(mSecond.Subscribe(internalObserver));
await s.Yield();
return Disposable.Empty;
});
odd1: Subscribe()
odd1: OnNext(1)
Merged on 1 -->1
even1: Subscribe()
odd1: OnNext(3)
Merged on 1 -->3
even1: OnNext(2)
Merged on 1 -->2
odd1: OnCompleted()
odd1: Dispose()
even1: OnNext(4)
Merged on 1 -->4
even1: OnCompleted()
Merged on 1 completed
even1: Dispose()
在这两种情况下,对第一个订阅的调用都会在对下一个可观察对象的订阅调用之前产生对 OnNext 的调用。我缺少什么?非常感谢任何帮助。
我认为写下这个问题可以帮助我更好地理解这个问题。我不需要递归地调度或调用收益。我只需要安排两个电话来订阅:
Scheduler.CurrentThread.Schedule(() =>
{
mSource.Subscribe(internalObserver);
mSecond.Subscribe(internalObserver);
});