合并两个具有取消支持的 IAsyncEnumerable 实例 (NotSupportedException)

问题描述 投票:0回答:1

我在正确实施以下内容时遇到困难:
我有两个

IAsyncEnumerable
实例(同一类型),一个是“主要”源,另一个的目的是让用户可以按需“插入”项目。
因此,如果主要源结束,辅助源也需要停止,从而结束枚举。 Merge 函数的调用者还需要能够在主要源结束时做出反应(即清理辅助可枚举)。

鉴于此,我想出了以下实现:

static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<object?> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
    var pIt = primary.GetAsyncEnumerator(ct);
    var sIt = secondary.GetAsyncEnumerator(ct);

    await using var _pIt = pIt.ConfigureAwait(false);
    await using var _sIt = sIt.ConfigureAwait(false);

    Task<bool>? pItTask = null;
    Task<bool>? sItTask = null;

    IAsyncEnumerator<object?>? it = null;
    while (true) {
        Task<bool> task;

        try {

            pItTask ??= pIt.MoveNextAsync().AsTask();
            sItTask ??= sIt.MoveNextAsync().AsTask();


            task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
            if (pItTask == task) {
                pItTask = null;
                it = pIt;
                if (!task.Result) {
                    primaryFinished();
                    yield break;
                }

            } else {

                sItTask = null;
                it = sIt;
            }
        } catch (Exception) {
            primaryFinished();
            yield break;
        }

        if (task.Result) yield return it.Current;
    }
}

问题是,如果主要源抛出异常,这将抛出

NotSupportedException
,这会在堆栈上升时搞乱我的异常处理。

我发现了一个github问题,它提到了问题,但并没有真正解决它(即使有延迟,我也遇到了异常)。

重现问题的完整代码:
https://dotnetfiddle.net/v7SgEV

//Needs the "System.Interactive.Async" nuget package
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Threading.Channels;

while (true) {
    try {
        using var cts = new CancellationTokenSource();

        var secondaryDataSource = Channel.CreateUnbounded<object?>();

        var reader = PrimaryDataSource(cts.Token).Do(x => {
            Console.WriteLine("---");
        });

        var combinedSource = Merge(
            reader,
            secondaryDataSource.Reader.ReadAllAsync(cts.Token),
            () => secondaryDataSource.Writer.TryComplete(),
            cts.Token
        );

        await foreach (var item in combinedSource) {
            Console.WriteLine(item ?? "<Null>");
            throw new Exception();
        }

    } catch (NotSupportedException ex) {
        throw;
    } catch (Exception) {
    }
}

static async IAsyncEnumerable<object?> PrimaryDataSource([EnumeratorCancellation] CancellationToken ct) {
    while (true) {
        await Task.Delay(Random.Shared.Next(100), ct).ConfigureAwait(false);
        yield return "PrimaryDataSource";

        throw new Exception();
        yield break;
    }
}

static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<object?> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
    var pIt = primary.GetAsyncEnumerator(ct);
    var sIt = secondary.GetAsyncEnumerator(ct);

    await using var _pIt = pIt.ConfigureAwait(false);
    await using var _sIt = sIt.ConfigureAwait(false);

    Task<bool>? pItTask = null;
    Task<bool>? sItTask = null;

    IAsyncEnumerator<object?>? it = null;
    while (true) {
        Task<bool> task;

        try {

            pItTask ??= pIt.MoveNextAsync().AsTask();
            sItTask ??= sIt.MoveNextAsync().AsTask();


            task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
            if (pItTask == task) {
                pItTask = null;
                it = pIt;
                if (!task.Result) {
                    primaryFinished();
                    yield break;
                }

            } else {

                sItTask = null;
                it = sIt;
            }
        } catch (Exception) {
            primaryFinished();
            if (pItTask != null) await pItTask;
            if (sItTask != null) await sItTask;
            yield break;
        }

        if (task.Result) yield return it.Current;
    }
}
c# asynchronous cancellation iasyncenumerable notsupportedexception
1个回答
0
投票

根据 TheodorZoulias/shingo 的评论,我发现了问题并修复了它:

private static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<IList<object?>> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
    var pIt = primary.GetAsyncEnumerator(ct);
    var sIt = secondary.GetAsyncEnumerator(ct);

    var _pIt = pIt.CAf();
    var _sIt = sIt.CAf();

    Task<bool>? pItTask = null;
    Task<bool>? sItTask = null;

    try {
        IAsyncEnumerator<object?>? it = null;
        while(true) {
            Task<bool> task;
            pItTask ??= pIt.MoveNextAsync(ct).AsTask();
            sItTask ??= sIt.MoveNextAsync(ct).AsTask();

            task = await Task.WhenAny(pItTask, sItTask).CAf();
            if(pItTask == task) {
                pItTask = null;
                it = pIt;
                if(!task.Result) {
                    primaryFinished();
                    yield break;
                }
            } else {
                sItTask = null;
                it = sIt;
            }

            if(task.Result) {
                yield return it.Current;
            }
        }
    } finally {
        try { await _pIt.DisposeAsync(); } catch { }
        try { await _sIt.DisposeAsync(); } catch { }
    }
}

重要的部分是

DisposeAsync
周围的 try/catch,因为它们可能会抛出。

© www.soinside.com 2019 - 2024. All rights reserved.