在 C#8 IAsyncEnumerable 中并行化yield return<T>

问题描述 投票:0回答:3

我有一个返回异步枚举器的方法

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        foreach (var item in ListOfWorkItems)
        {
            yield return DoWork(item);
        }
    }

来电者:

    public async Task LogResultsAsync()
    {
        await foreach (var result in DoWorkAsync())
        {
            Console.WriteLine(result);
        }
    }

因为

DoWork
是一个昂贵的操作,我更喜欢以某种方式并行化它,所以它的工作原理类似于:

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        Parallel.ForEach(ListOfWorkItems, item =>
        {
            yield return DoWork(item);
        });
    }

但是我无法从内部进行收益回报

Parallel.Foreach
所以只是想知道最好的方法是什么?

返回结果的顺序并不重要。

谢谢。

编辑:抱歉,我在

DoWorkAsync
中遗漏了一些代码,它确实在等待一些我只是没有将其放入上面的代码中的东西,因为这与问题不太相关。现已更新

Edit2:

DoWork
在我的例子中主要是 I/O 限制,它从数据库读取数据。

c# asynchronous iasyncenumerable
3个回答
3
投票

这是使用 TPL Dataflow

 库中的 
TransformBlock 的基本实现:

public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
    // Define the dataflow block
    var block = new TransformBlock<IWorkItem, IResult>(async item =>
    {
        return await TransformAsync(item);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10, // the default is 1
        EnsureOrdered = false // the default is true
    });

    // Feed the block with input data
    foreach (var item in workItems)
    {
        block.Post(item);
    }
    block.Complete();

    // Stream the block's output as IAsyncEnumerable
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var result))
        {
            yield return result;
        }
    }

    // Propagate the first exception, if any.
    await block.Completion;
}

此实现并不完美,因为如果

IAsyncEnumerable
的使用者过早放弃枚举,
TransformBlock
将继续在后台工作,直到处理完所有工作项。此外,它不支持取消,而所有受人尊敬的
IAsyncEnumerable
制作方法都应该支持取消。这些缺失的功能可以相对容易地添加。如果您有兴趣添加它们,请查看this问题。

另一个缺陷是,如果

await TransformAsync(item)
抛出
OperationCanceledException
,该错误将被抑制。这是 TPL 数据流的设计行为。


.NET 6 更新: .NET 6 中引入了新的 API

DataflowBlock.ReceiveAllAsync
,可以简化块输出的流式传输。但有一个问题。有关详细信息,请参阅此答案


2
投票

根据 canton7 的建议,您可以使用

AsParallel
代替
Parallel.ForEach

这可以在标准

foreach
循环中使用,您可以在其中产生结果:

public async IAsyncEnumerable<IResult> DoWorkAsync()
{
    await Something();
    foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
    {
        yield return result;
    }
}

正如 Theodor Zoulias 所提到的,返回的可枚举实际上根本不是异步的。

如果您只需要使用

await foreach
来使用它,这应该不是问题,但更明确地说,您可以返回
IEnumerable
并让调用者并行化它:

public async Task<IEnumerable<Item>> DoWorkAsync()
{
    await Something();
    return ListOfWorkItems;
}

// Caller...
Parallel.ForEach(await DoWorkAsync(), item => 
{
    var result = DoWork(item);
    //...
});

尽管如果需要在多个地方调用,这可能不太可维护


1
投票
不幸的是,Theodor Zoulias 提出的解决方案有一个小问题:您无法选择聚合多个异常,因此仅传播第一个异常。 这是使用

System.Threading.Channels 的现代解决方案(可用 自 .NET Core 3 起)和 Parallel.ForEachAsync(自 .NET 6 起可用),以限制最大并行度。

#nullable enable using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace StackOverflow.SampleCode; public class ParallelExecutionException<T> : Exception { internal ParallelExecutionException(T item, Exception innerException) : base(innerException.Message, innerException) { Item = item; } public T Item { get; } public new Exception InnerException => base.InnerException!; } public static class AsyncEnumerableExtensions { public static async IAsyncEnumerable<TOutput> AsParallelAsync<TInput, TOutput>(this IAsyncEnumerable<TInput> source, int maxDegreeOfParallelism, Func<TInput, CancellationToken, Task<TOutput>> transform, bool aggregateException = false, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(source); if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism)); var channelOptions = new UnboundedChannelOptions { SingleReader = true }; var channel = Channel.CreateUnbounded<TOutput>(channelOptions); _ = Task.Run(async () => { var exceptions = new List<Exception>(); var writer = channel.Writer; var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism, CancellationToken = cancellationToken, }; await Parallel.ForEachAsync(source, parallelOptions, async (item, ct) => { try { var result = await transform(item, ct); await writer.WriteAsync(result, ct); } catch (Exception exception) { var parallelExecutionException = new ParallelExecutionException<TInput>(item, exception); if (aggregateException) { exceptions.Add(parallelExecutionException); } else { writer.Complete(parallelExecutionException); } } }); writer.Complete(exceptions.Any() ? new AggregateException(exceptions) : null); }, cancellationToken); await foreach (var result in channel.Reader.ReadAllAsync(cancellationToken)) { yield return result; } } }
使用此解决方案,您可以选择是在第一次遇到异常时中止 (aggregateException = false),还是继续处理并在处理所有源项后抛出 

AggregateException

 (aggregateException = true)。

© www.soinside.com 2019 - 2024. All rights reserved.