我在正确实施以下内容时遇到困难:
我有两个
IAsyncEnumerable
实例(同一类型),一个是“主要”源,另一个的目的是让用户可以按需“插入”项目。鉴于此,我想出了以下实现:
static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<object?> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task<bool>? pItTask = null;
Task<bool>? sItTask = null;
IAsyncEnumerator<object?>? it = null;
while (true) {
Task<bool> task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
yield break;
}
if (task.Result) yield return it.Current;
}
}
问题是,如果主要源抛出异常,这将抛出
NotSupportedException
,这会在堆栈上升时搞乱我的异常处理。
我发现了一个github问题,它提到了问题,但并没有真正解决它(即使有延迟,我也遇到了异常)。
重现问题的完整代码:
https://dotnetfiddle.net/v7SgEV
//Needs the "System.Interactive.Async" nuget package
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
while (true) {
try {
using var cts = new CancellationTokenSource();
var secondaryDataSource = Channel.CreateUnbounded<object?>();
var reader = PrimaryDataSource(cts.Token).Do(x => {
Console.WriteLine("---");
});
var combinedSource = Merge(
reader,
secondaryDataSource.Reader.ReadAllAsync(cts.Token),
() => secondaryDataSource.Writer.TryComplete(),
cts.Token
);
await foreach (var item in combinedSource) {
Console.WriteLine(item ?? "<Null>");
throw new Exception();
}
} catch (NotSupportedException ex) {
throw;
} catch (Exception) {
}
}
static async IAsyncEnumerable<object?> PrimaryDataSource([EnumeratorCancellation] CancellationToken ct) {
while (true) {
await Task.Delay(Random.Shared.Next(100), ct).ConfigureAwait(false);
yield return "PrimaryDataSource";
throw new Exception();
yield break;
}
}
static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<object?> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task<bool>? pItTask = null;
Task<bool>? sItTask = null;
IAsyncEnumerator<object?>? it = null;
while (true) {
Task<bool> task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
if (pItTask != null) await pItTask;
if (sItTask != null) await sItTask;
yield break;
}
if (task.Result) yield return it.Current;
}
}
根据 TheodorZoulias/shingo 的评论,我发现了问题并修复了它:
private static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<IList<object?>> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
var _pIt = pIt.CAf();
var _sIt = sIt.CAf();
Task<bool>? pItTask = null;
Task<bool>? sItTask = null;
try {
IAsyncEnumerator<object?>? it = null;
while(true) {
Task<bool> task;
pItTask ??= pIt.MoveNextAsync(ct).AsTask();
sItTask ??= sIt.MoveNextAsync(ct).AsTask();
task = await Task.WhenAny(pItTask, sItTask).CAf();
if(pItTask == task) {
pItTask = null;
it = pIt;
if(!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
if(task.Result) {
yield return it.Current;
}
}
} finally {
try { await _pIt.DisposeAsync(); } catch { }
try { await _sIt.DisposeAsync(); } catch { }
}
}
重要的部分是
DisposeAsync
周围的 try/catch,因为它们可能会抛出。