C# Rx 取消和竞争条件

问题描述 投票:0回答:1

我有一个异步方法,它从 Rx Observable 接收数据并将其写入文件。给这个方法一个cancelToken并在某个随机时间取消它。

该方法是通过await Observable.Do(...).ToTask(cancellationToken)实现的。然而,Observable 的 onNext 事件可能会在取消完成后发生,并导致使用已释放的对象。您可以在下面的消息中看到 onNext 和 finally 之间的线程没有不同。

static async Task Main(string[] args)
{
    var observable = Observable
        .Interval(TimeSpan.FromMilliseconds(10))
        .Timestamp();

    var cts = new CancellationTokenSource();
    var task = SaveAsync(observable, "abc.txt", cts.Token);

    Console.ReadKey();

    cts.Cancel();

    try
    {
        await task;
    }
    catch (OperationCanceledException)
    {
    }

    Console.ReadKey();
}

static async Task SaveAsync(
    IObservable<Timestamped<long>> observable,
    string filepath,
    CancellationToken cancellationToken)
{
    var writer = new StreamWriter(filepath);

    try
    {
        await observable.Do(timestamp =>
        {
            Thread.Sleep(100);

            Console.WriteLine($"Do a thread={Environment.CurrentManagedThreadId}");

            writer.WriteLine(timestamp);

            Console.WriteLine($"Do b");
        }).ToTask(cancellationToken);
    }
    finally
    {
        writer.Dispose();
        Console.WriteLine($"Finally thread={Environment.CurrentManagedThreadId}");
    }
}
Do a thread=4
Do b
Do a thread=4
Do b
Finally thread=1
Do a thread=4

如果我将实现更改为await Observable.Do(...).TakeWhile(_ => !cancellationToken.IsCancellationRequested),则不会出现竞争条件,因为onNext 和finally 之间的线程是相同的。

static async Task SaveAsync(
   IObservable<Timestamped<long>> observable,
   string filepath,
   CancellationToken cancellationToken)
{
    var writer = new StreamWriter(filepath);

    try
    {
        await observable.Do(timestamp =>
        {
            Thread.Sleep(100);

            Console.WriteLine($"Do a thread={Environment.CurrentManagedThreadId}");

            writer.WriteLine(timestamp);

            Console.WriteLine($"Do b");
        }).TakeWhile(_ => !cancellationToken.IsCancellationRequested);
    }
    finally
    {
        writer.Dispose();
        Console.WriteLine($"Finally thread={Environment.CurrentManagedThreadId}");
    }
}
Do a thread=4
Do b
Do a thread=4
Do b
Do a thread=4
Do b
Finally thread=4

对我来说,ToTask(cancellationToken) 对我来说更简单、更自然,有没有办法在不同线程的情况下避免竞争条件?或者有更好的方法来实现这个方法吗?

c# system.reactive
1个回答
0
投票

检查这个,注意

TakeUntil

using System.Reactive;
using System.Reactive.Linq;

internal class Program
{
    static async Task Main(string[] args)
    {
        var observable = Observable
            .Interval(TimeSpan.FromMilliseconds(10))
            .Timestamp();

        var cts = new CancellationTokenSource();
        var task = SaveAsync(observable, "abc.txt", cts.Token);

        Console.ReadKey();

        cts.Cancel();

        try
        {
            await task;
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"Shouldn't be here");
        }

        Console.ReadKey();
    }

    static async Task SaveAsync(
        IObservable<Timestamped<long>> observable,
        string filepath,
        CancellationToken cancellationToken)
    {
        await Observable.Using(
            () => new StreamWriter(filepath),
            writer => observable
                .TakeUntil(cancellationToken.ToObservable())
                .Do(timestamp =>
                {
                    Thread.Sleep(100);

                    Console.WriteLine($"Do a thread={Environment.CurrentManagedThreadId}, {timestamp}");

                    writer.WriteLine(timestamp);

                    Console.WriteLine($"Do b, {timestamp}");
                })
                .DefaultIfEmpty()
                .Finally(() => Console.WriteLine($"Finally thread={Environment.CurrentManagedThreadId}"))
        );
    }
}

internal static class Extensions
{
    public static IObservable<Unit> ToObservable(this CancellationToken ct,
        bool throwIfCancellationRequested = false) =>
        Observable.Create<Unit>(observer => ct.Register(() =>
            {
                if (throwIfCancellationRequested)
                    observer.OnError(new OperationCanceledException(ct));
                else
                {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                }
            })
        );
}
© www.soinside.com 2019 - 2024. All rights reserved.