我有一个返回异步枚举器的方法
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var item in ListOfWorkItems)
{
yield return DoWork(item);
}
}
来电者:
public async Task LogResultsAsync()
{
await foreach (var result in DoWorkAsync())
{
Console.WriteLine(result);
}
}
因为
DoWork
是一个昂贵的操作,我更喜欢以某种方式并行化它,所以它的工作原理类似于:
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
Parallel.ForEach(ListOfWorkItems, item =>
{
yield return DoWork(item);
});
}
但是我无法从内部进行收益回报
Parallel.Foreach
所以只是想知道最好的方法是什么?
返回结果的顺序并不重要。
谢谢。
编辑:抱歉,我在
DoWorkAsync
中遗漏了一些代码,它确实在等待一些我只是没有将其放入上面的代码中的东西,因为这与问题不太相关。现已更新
Edit2:
DoWork
在我的例子中主要是 I/O 限制,它从数据库读取数据。
这是使用 TPL Dataflow
库中的
TransformBlock
的基本实现:
public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
// Define the dataflow block
var block = new TransformBlock<IWorkItem, IResult>(async item =>
{
return await TransformAsync(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10, // the default is 1
EnsureOrdered = false // the default is true
});
// Feed the block with input data
foreach (var item in workItems)
{
block.Post(item);
}
block.Complete();
// Stream the block's output as IAsyncEnumerable
while (await block.OutputAvailableAsync())
{
while (block.TryReceive(out var result))
{
yield return result;
}
}
// Propagate the first exception, if any.
await block.Completion;
}
此实现并不完美,因为如果
IAsyncEnumerable
的使用者过早放弃枚举,TransformBlock
将继续在后台工作,直到处理完所有工作项。此外,它不支持取消,而所有受人尊敬的 IAsyncEnumerable
制作方法都应该支持取消。这些缺失的功能可以相对容易地添加。如果您有兴趣添加它们,请查看this问题。
另一个缺陷是,如果
await TransformAsync(item)
抛出 OperationCanceledException
,该错误将被抑制。这是 TPL 数据流的设计行为。
DataflowBlock.ReceiveAllAsync
,可以简化块输出的流式传输。但有一个问题。有关详细信息,请参阅此答案。
根据 canton7 的建议,您可以使用
AsParallel
代替 Parallel.ForEach
。
这可以在标准
foreach
循环中使用,您可以在其中产生结果:
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
{
yield return result;
}
}
正如 Theodor Zoulias 所提到的,返回的可枚举实际上根本不是异步的。
如果您只需要使用
await foreach
来使用它,这应该不是问题,但更明确地说,您可以返回 IEnumerable
并让调用者并行化它:
public async Task<IEnumerable<Item>> DoWorkAsync()
{
await Something();
return ListOfWorkItems;
}
// Caller...
Parallel.ForEach(await DoWorkAsync(), item =>
{
var result = DoWork(item);
//...
});
尽管如果需要在多个地方调用,这可能不太可维护
System.Threading.Channels 的现代解决方案(可用 自 .NET Core 3 起)和 Parallel.ForEachAsync(自 .NET 6 起可用),以限制最大并行度。
#nullable enable
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace StackOverflow.SampleCode;
public class ParallelExecutionException<T> : Exception
{
internal ParallelExecutionException(T item, Exception innerException) : base(innerException.Message, innerException)
{
Item = item;
}
public T Item { get; }
public new Exception InnerException => base.InnerException!;
}
public static class AsyncEnumerableExtensions
{
public static async IAsyncEnumerable<TOutput> AsParallelAsync<TInput, TOutput>(this IAsyncEnumerable<TInput> source, int maxDegreeOfParallelism, Func<TInput, CancellationToken, Task<TOutput>> transform, bool aggregateException = false, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
if (maxDegreeOfParallelism < 1)
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
var channelOptions = new UnboundedChannelOptions { SingleReader = true };
var channel = Channel.CreateUnbounded<TOutput>(channelOptions);
_ = Task.Run(async () =>
{
var exceptions = new List<Exception>();
var writer = channel.Writer;
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
CancellationToken = cancellationToken,
};
await Parallel.ForEachAsync(source, parallelOptions, async (item, ct) =>
{
try
{
var result = await transform(item, ct);
await writer.WriteAsync(result, ct);
}
catch (Exception exception)
{
var parallelExecutionException = new ParallelExecutionException<TInput>(item, exception);
if (aggregateException)
{
exceptions.Add(parallelExecutionException);
}
else
{
writer.Complete(parallelExecutionException);
}
}
});
writer.Complete(exceptions.Any() ? new AggregateException(exceptions) : null);
}, cancellationToken);
await foreach (var result in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return result;
}
}
}
使用此解决方案,您可以选择是在第一次遇到异常时中止 (aggregateException = false),还是继续处理并在处理所有源项后抛出 AggregateException
(aggregateException = true)。