我正在调查一个复杂的问题,由于某种原因,
BehaviorSubject
向观察者发出错误。
即使没有人真正与该可观察的交互,这种情况也会发生。
唯一的联系似乎是它们都在我的 WPF 应用程序的 UI 线程上进行评估。
这在某种程度上代表了我的代码的样子,但其中可能缺少实际问题:
private readonly BehaviorSubject<int> _exampleSubject;
public void NotifyChanged(int newValue)
{
_exampleSubject.OnNext(newValue);
}
public IObservable<int> _ConstructStream()
{
return _exampleSubject
.Do(
onNext: count => Console.WriteLine($"_exampleSubject.OnNext: {count} (thread: {Thread.CurrentThread.ManagedThreadId})"),
onError: exception => Console.WriteLine($"_exampleSubject.OnError: {exception} (thread: {Thread.CurrentThread.ManagedThreadId})"),
onCompleted: () => Console.WriteLine($"_exampleSubject.OnCompleted (thread: {Thread.CurrentThread.ManagedThreadId})"));
}
由于某种原因,
_exampleSubject
发出异常,这是没有意义的,因为OnError
从未在任何地方被调用。
在另一个位置,可观察对象中的异常没有得到正确处理,这似乎以某种方式干扰了它。
不幸的是,我无法创建一个最小的示例来重现此问题。 似乎只有在包含所有代码时才会出现某种竞争条件。
我已经成功重现了部分问题,这将在下面讨论。
要重现该问题,请使用默认的 WPF 模板并对
App.xaml.cs
进行以下修改:
// Target framework:
// net7.0-windows
// Packages:
// System.Reactive 6.0.0
namespace WpfApp1
{
public partial class App : Application
{
protected override void OnStartup(StartupEventArgs e)
{
base.OnStartup(e);
// ... Call the example methods here.
}
// ... Define the example methods here.
}
}
private async Task _Example_2()
{
var dispatcherSynchronizationContext = new DispatcherSynchronizationContext(Application.Current.Dispatcher);
// Adding this code would no longer reproduce the issue:
// using var disposable = Observable
var disposable = Observable
.Timer(TimeSpan.FromMilliseconds(100))
.ObserveOn(dispatcherSynchronizationContext)
.SubscribeOn(dispatcherSynchronizationContext)
.Subscribe(_ => throw new InvalidOperationException());
// Wait a bit for things to play out.
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
当抛出异常时,似乎会冻结主线程,整个窗口都会冻结,并且同一调度程序上的其他任何内容都会停止处理。 未绑定到同一调度程序的其他可观察量会在应用程序自动关闭之前继续短暂运行。
这正是我所期望的,因为未经处理的异常应该终止应用程序。 不幸的是,没有任何日志消息表明发生了这种情况。
问题 1: 这里奇怪的是,如果我向一次性添加
using
语句,问题就不再出现。我希望在等待延迟之后,处置会在函数结束时发生,但它似乎发生在那之前。
再次看到这个,我对自己的例子感到困惑。 300ms 后,它将从进行 dispose 调用的方法返回。在我的脑海里,我的想法更像是
Timeout.InfiniteTimeSpan
。
有趣的是,同样的问题可以通过
Observable.FromAsync
和 Concat
重现,但前提是在切换发生之前没有延迟。
private async Task _Example_1()
{
var dispatcherSynchronizationContext = new DispatcherSynchronizationContext(Application.Current.Dispatcher);
Observable
.Timer(TimeSpan.FromMilliseconds(100))
.ObserveOn(dispatcherSynchronizationContext)
.SubscribeOn(dispatcherSynchronizationContext)
.Select(index => Observable.FromAsync(async () =>
{
// Adding this code would no longer reproduce the issue:
// await Task.Delay(TimeSpan.FromMilliseconds(100));
throw new InvalidOperationException();
}))
.Concat()
.Subscribe();
// Wait a bit for things to play out.
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
问题2:看起来,当它切换线程并暂停执行时,发生了一些奇怪的事情。 这似乎改变了行为。添加延迟仍然会导致调度程序死亡。
我能够解决我的具体问题,但我仍然不明白为什么异常归因于
BehaviorSubject
。我还是不太明白,但这似乎解决了我遇到的“问题”。
我不明白发生的每一个小细节,但我的理解如下:
FromAsync
本质上是StartAsync
的包装器,它将任务的执行推迟到发生订阅为止。
StartAsync
的实现将启动您的任务,然后使用Task.ToObservable()
并返回该任务。
ToObservable
的实现最终会运行以下代码:
if (_scheduler == null)
{
_task.ContinueWith(
static (t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject!),
observer,
cts.Token,
options,
TaskScheduler.Current);
}
TaskScheduler.Current
表示延续将在任务调度程序而不是当前同步上下文上运行。
这个程序完美地重现了这个问题:
internal class Program
{
public static async Task Main()
{
Dispatcher.CurrentDispatcher.InvokeAsync(MainOnDispatcher);
Dispatcher.Run();
}
public static void MainOnDispatcher()
{
var triggerObservable = new[] { 1 }.ToObservable();
triggerObservable
.ObserveOnDispatcher()
.Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) before from async"))
.Select(_ => Observable.FromAsync(async () => await ExampleAsync()))
.Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) after from async"))
.Concat()
.Do(_ => Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) after concat"))
.Subscribe();
}
private static async Task ExampleAsync()
{
Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) about to wait");
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"(thread: {Thread.CurrentThread.ManagedThreadId}) done waiting");
}
}
(thread: 1) before from async
(thread: 1) after from async
(thread: 1) about to wait
(thread: 1) done waiting
(thread: 4) after concat
注意,当你消费
FromAsync
的结果时,线程是如何切换的,但里面的所有代码都是在主线程上运行的。我不确定这是否是预期的行为或只是一个实施问题。因此,我在 GitHub 上创建了一个问题。
如果从中删除延迟,行为会发生变化,因为默认情况下它将使用
ImmediateScheduler
(那么 _scheduler
不是 null
)。这就是让这变得更加混乱的原因。
这几乎就是导致我出现问题的原因。由于某种未知的原因,异常实际上归因于错误的可观察值,并且我的调试器停在上面讨论的
BehaviorSubject
处。我仍然不确定是什么导致了这个问题。