使用LINQ的AsParallel()并行运行异步函数

问题描述 投票:0回答:2

我有一个Document DB存储库类,它有一个get方法,如下所示:

private static DocumentClient client;
public async Task<TEntity> Get(string id, string partitionKey = null)
{
    try
    {
        RequestOptions requestOptions = null;
        if (partitionKey != null)
        {
            requestOptions = new RequestOptions { PartitionKey = new PartitionKey(partitionKey) };
        }

        var result = await client.ReadDocumentAsync(
            UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id),
            requestOptions);
        return (TEntity)(dynamic)result.Resource;
    }
    catch (DocumentClientException e)
    {
        // Have logic for different exceptions actually
        throw;
    }
}

我有两个集合 - Collection1和Collection2。 Collection1是非分区的,而Collection2是分区的。

在客户端,我创建了两个存储库对象,每个集合对应一个。

private static DocumentDBRepository<Collection1Item> collection1Repository = new DocumentDBRepository<Collection1Item>("Collection1");
private static DocumentDBRepository<Collection2Item> collection2Repository = new DocumentDBRepository<Collection2Item>("Collection2");

List<Collection1Item> collection1Items = await collection1Repository.GetItemsFromCollection1(); // Selects first forty documents based on time

List<UIItem> uiItems = new List<UIItem>();
foreach (var item in collection1Items)
{
    var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId);    // TargetId is my partition key for Collection2
    uiItems.Add(new UIItem
    {
        ItemId = item.ItemId,
        Collection1Reference = item.Id,
        TargetId = item.TargetId,        
        Collection2Reference = item.Collection2Reference,
        Value = collection2Item.Value
    });
}

这很好用。但由于它与foreach一起顺序发生,我想要并行执行这些Get调用。当我这样做并行如下:

ConcurrentBag<UIItem> uiItems = new ConcurrentBag<UIItem>();
collection1Items.AsParallel().ForAll(async item => {
    var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId);    // TargetId is my partition key for Collection2
    uiItems.Add(new UIItem
    {
        ItemId = item.ItemId,
        Collection1Reference = item.Id,
        TargetId = item.TargetId,        
        Collection2Reference = item.Collection2Reference,
        Value = collection2Item.Value
    });
  }
);

它不起作用,uiItems总是空的。

async-await azure-cosmosdb
2个回答
2
投票

您不需要Parallel.For来同时运行异步操作。如果它们是真正的异步,它们已经同时运行。

您可以收集从每个操作返回的任务,只需在所有任务上调用等待Task.WhenAll()。如果修改lambda以创建并返回UIItem,则await Task.WhenAll()的结果将是UIItems的集合。无需从并发操作内部修改全局状态。

例如:

var itemTasks = collection1Items.Select(async item =>
    {
        var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId);    
        return new UIItem
        {
            ItemId = item.ItemId,
            Collection1Reference = item.Id,
            TargetId = item.TargetId,        
            Collection2Reference = item.Collection2Reference,
            Value = collection2Item.Value
        }
    });

var results= await Task.WhenAll(itemTasks);

但需要注意的是 - 这将同时解雇所有Get操作。这可能不是你想要的,特别是在调用具有速率限制的服务时。


1
投票

尝试简单地启动任务并在最后等待所有任务。这将导致并行执行。

var tasks = collection1Items.Select(async item =>
{
    //var collection2Item = await storageRepository.Get...
    return new UIItem
    {
        //...
    };
});

var uiItems = await Task.WhenAll(tasks);

PLINQ在使用内存构造并使用尽可能多的线程时非常有用,但如果与async-await技术(用于在访问外部资源时释放线程)一起使用,则最终会得到奇怪的结果。

© www.soinside.com 2019 - 2024. All rights reserved.