我有一个异步方法,它从 Rx Observable 接收数据并将其写入文件。给这个方法一个cancelToken并在某个随机时间取消它。
该方法是通过await Observable.Do(...).ToTask(cancellationToken)实现的。然而,Observable 的 onNext 事件可能会在取消完成后发生,并导致使用已释放的对象。您可以在下面的消息中看到 onNext 和 finally 之间的线程没有不同。
static async Task Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(10))
.Timestamp();
var cts = new CancellationTokenSource();
var task = SaveAsync(observable, "abc.txt", cts.Token);
Console.ReadKey();
cts.Cancel();
try
{
await task;
}
catch (OperationCanceledException)
{
}
Console.ReadKey();
}
static async Task SaveAsync(
IObservable<Timestamped<long>> observable,
string filepath,
CancellationToken cancellationToken)
{
var writer = new StreamWriter(filepath);
try
{
await observable.Do(timestamp =>
{
Thread.Sleep(100);
Console.WriteLine($"Do a thread={Environment.CurrentManagedThreadId}");
writer.WriteLine(timestamp);
Console.WriteLine($"Do b");
}).ToTask(cancellationToken);
}
finally
{
writer.Dispose();
Console.WriteLine($"Finally thread={Environment.CurrentManagedThreadId}");
}
}
Do a thread=4
Do b
Do a thread=4
Do b
Finally thread=1
Do a thread=4
如果我将实现更改为await Observable.Do(...).TakeWhile(_ => !cancellationToken.IsCancellationRequested),则不会出现竞争条件,因为onNext 和finally 之间的线程是相同的。
static async Task SaveAsync(
IObservable<Timestamped<long>> observable,
string filepath,
CancellationToken cancellationToken)
{
var writer = new StreamWriter(filepath);
try
{
await observable.Do(timestamp =>
{
Thread.Sleep(100);
Console.WriteLine($"Do a thread={Environment.CurrentManagedThreadId}");
writer.WriteLine(timestamp);
Console.WriteLine($"Do b");
}).TakeWhile(_ => !cancellationToken.IsCancellationRequested);
}
finally
{
writer.Dispose();
Console.WriteLine($"Finally thread={Environment.CurrentManagedThreadId}");
}
}
Do a thread=4
Do b
Do a thread=4
Do b
Do a thread=4
Do b
Finally thread=4
对我来说,ToTask(cancellationToken) 对我来说更简单、更自然,有没有办法在不同线程的情况下避免竞争条件?或者有更好的方法来实现这个方法吗?
检查这个,注意
TakeUntil
using System.Reactive;
using System.Reactive.Linq;
internal class Program
{
static async Task Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(10))
.Timestamp();
var cts = new CancellationTokenSource();
var task = SaveAsync(observable, "abc.txt", cts.Token);
Console.ReadKey();
cts.Cancel();
try
{
await task;
}
catch (OperationCanceledException)
{
Console.WriteLine($"Shouldn't be here");
}
Console.ReadKey();
}
static async Task SaveAsync(
IObservable<Timestamped<long>> observable,
string filepath,
CancellationToken cancellationToken)
{
await Observable.Using(
() => new StreamWriter(filepath),
writer => observable
.TakeUntil(cancellationToken.ToObservable())
.Do(timestamp =>
{
Thread.Sleep(100);
Console.WriteLine($"Do a thread={Environment.CurrentManagedThreadId}, {timestamp}");
writer.WriteLine(timestamp);
Console.WriteLine($"Do b, {timestamp}");
})
.DefaultIfEmpty()
.Finally(() => Console.WriteLine($"Finally thread={Environment.CurrentManagedThreadId}"))
);
}
}
internal static class Extensions
{
public static IObservable<Unit> ToObservable(this CancellationToken ct,
bool throwIfCancellationRequested = false) =>
Observable.Create<Unit>(observer => ct.Register(() =>
{
if (throwIfCancellationRequested)
observer.OnError(new OperationCanceledException(ct));
else
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
}
})
);
}