Subject.HasObservers在附加的示例代码中不会立即成为未确定数量的刻度。如果我取出SubscribeOn(),HasObservers总是如此,所以我知道它与IScheduler初始化有关。
这导致我们的生产软件出现问题,尽管在允许调用OnNext()的线程被允许继续之前保证IDisposable订阅变量已经初始化,但是前几次调用OnNext()无处可去。这是RX中的错误吗?
有什么其他方法可以使用System.Reactive类来保证订阅是在没有轮询的情况下使用调度程序设置的?
我试过了Subject.Synchronize(),但没有区别。
static void Main(string[] args)
{
for (int i = 0; i < 100; i++)
{
var source = new Subject<long>();
IDisposable subscription = source
.SubscribeOn(ThreadPoolScheduler.Instance)
.Subscribe(Console.WriteLine);
// 0 and 668,000 ticks for subscription setup, but rarely 0.
int iterations = 0;
while (!source.HasObservers)
{
iterations++;
Thread.SpinWait(1);
}
// Next line would rarely output to Console without while loop
source.OnNext(iterations);
subscription.Dispose();
source.Dispose();
}
}
我预计Subject.HasObservers在没有轮询的情况下是真的。
据我所知,问题是您的订阅是异步完成的:调用未被阻止,因此真正的订阅将在稍后的其他线程上完成。
我没有找到确切知道订阅是否确实落地的确切方法(甚至根本不可能)。如果您的问题是第一个OnNext
和订阅之间的竞争,那么您可能需要使用Replay()
+ Connect()
将Observable转换为可连接的Observable。这样您就可以确保每个订阅者获得完全相同的序列。
using (var source = new Subject<long>())
{
var connectableSource = source.Replay();
connectableSource.Connect();
using (var subscription = connectableSource
.SubscribeOn(ThreadPoolScheduler.Instance)
.Subscribe(Console.WriteLine))
{
source.OnNext(42); // outputs 42 always
Console.ReadKey(false);
}
}
在我的代码中,我仍然需要Console.ReadKey
,因为在其他线程上完成订阅和取消订阅之间的竞争。
我现在提出的解决方案,我希望有人可以改进:
public class SubscribedSubject<T> : ISubject<T>, IDisposable
{
private readonly Subject<T> _subject = new Subject<T>();
private readonly ManualResetEventSlim _subscribed = new ManualResetEventSlim();
public bool HasObservers => _subject.HasObservers;
public void Dispose() => _subject.Dispose();
public void OnCompleted() => Wait().OnCompleted();
public void OnError(Exception error) => Wait().OnError(error);
public void OnNext(T value) => Wait().OnNext(value);
public IDisposable Subscribe(IObserver<T> observer)
{
IDisposable disposable = _subject.Subscribe(observer);
_subscribed.Set();
return disposable;
}
private Subject<T> Wait()
{
_subscribed.Wait();
return _subject;
}
}
使用示例:
using (var source = new SubscribedSubject<long>())
{
using (source
.SubscribeOn(ThreadPoolScheduler.Instance)
.Subscribe(Console.WriteLine))
{
source.OnNext(42);
Console.ReadKey();
}
}