我正在使用https://github.com/dotnet/reactive 我使用 CombineLatest 将 observableA(通过计时器发出项目)与 observableB(手动插入)结合起来。 在 OnNext 方法中,有时我会向 observableB 插入新值 在这种情况下,OnNext 的新迭代会在当前 OnNext 返回之前立即开始。
_subscription = _observable.ClientData
.CombineLatest(_deviceTypeFilterViewModel.Filters)
.Subscribe(OnNext, OnError);
private void OnNext((IClientDataSlice slice, Filtter filter) sliceWithFilter)
{
Logger.Log(Slice.Number + " start");
//... pseudo code:
_deviceTypeFilterViewModel.Filters.Add(someValue)
Logger.Log(Slice.Number + " end");
}
将被执行为:
1 start
2 start
....
代替
1 start
1 finish
2 start
2 finish
当我在托管的 Blazor 服务器中执行相同操作时,它按预期工作:直到先前的方法返回,新的 OnNext 才会启动。这是设计使然吗?
发布一个值就像调用一个事件委托。在你的主题上调用
OnNext
(或者你发布到 ObservableB)将在你的观察者中同步调用 OnNext
,这意味着该方法将在你的代码继续之前运行完成。这可以通过将 .ObserveOn(ThreadPoolScheduler.Default)
添加到您的 rx 链来更改。这样做会导致发布一个值来触发线程池工作,这将等待线程池线程变得可用并在那里运行它。这仍然不会使您的第一个订阅在第二个订阅运行之前完成 - 为此,您可能需要研究原始锁定机制或考虑在您的 rx 链中调用.Synchronize()
(取决于您运行的代码)。请注意,如果您没有正确地将工作分配给线程池,这些选项可能会导致死锁。