我有一个Document DB存储库类,它有一个get方法,如下所示:
private static DocumentClient client;
public async Task<TEntity> Get(string id, string partitionKey = null)
{
try
{
RequestOptions requestOptions = null;
if (partitionKey != null)
{
requestOptions = new RequestOptions { PartitionKey = new PartitionKey(partitionKey) };
}
var result = await client.ReadDocumentAsync(
UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id),
requestOptions);
return (TEntity)(dynamic)result.Resource;
}
catch (DocumentClientException e)
{
// Have logic for different exceptions actually
throw;
}
}
我有两个集合 - Collection1和Collection2。 Collection1是非分区的,而Collection2是分区的。
在客户端,我创建了两个存储库对象,每个集合对应一个。
private static DocumentDBRepository<Collection1Item> collection1Repository = new DocumentDBRepository<Collection1Item>("Collection1");
private static DocumentDBRepository<Collection2Item> collection2Repository = new DocumentDBRepository<Collection2Item>("Collection2");
List<Collection1Item> collection1Items = await collection1Repository.GetItemsFromCollection1(); // Selects first forty documents based on time
List<UIItem> uiItems = new List<UIItem>();
foreach (var item in collection1Items)
{
var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId); // TargetId is my partition key for Collection2
uiItems.Add(new UIItem
{
ItemId = item.ItemId,
Collection1Reference = item.Id,
TargetId = item.TargetId,
Collection2Reference = item.Collection2Reference,
Value = collection2Item.Value
});
}
这很好用。但由于它与foreach一起顺序发生,我想要并行执行这些Get调用。当我这样做并行如下:
ConcurrentBag<UIItem> uiItems = new ConcurrentBag<UIItem>();
collection1Items.AsParallel().ForAll(async item => {
var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId); // TargetId is my partition key for Collection2
uiItems.Add(new UIItem
{
ItemId = item.ItemId,
Collection1Reference = item.Id,
TargetId = item.TargetId,
Collection2Reference = item.Collection2Reference,
Value = collection2Item.Value
});
}
);
它不起作用,uiItems总是空的。
您不需要Parallel.For来同时运行异步操作。如果它们是真正的异步,它们已经同时运行。
您可以收集从每个操作返回的任务,只需在所有任务上调用等待Task.WhenAll()。如果修改lambda以创建并返回UIItem,则await Task.WhenAll()
的结果将是UIItems的集合。无需从并发操作内部修改全局状态。
例如:
var itemTasks = collection1Items.Select(async item =>
{
var collection2Item = await storageRepository.Get(item.Collection2Reference, item.TargetId);
return new UIItem
{
ItemId = item.ItemId,
Collection1Reference = item.Id,
TargetId = item.TargetId,
Collection2Reference = item.Collection2Reference,
Value = collection2Item.Value
}
});
var results= await Task.WhenAll(itemTasks);
但需要注意的是 - 这将同时解雇所有Get
操作。这可能不是你想要的,特别是在调用具有速率限制的服务时。
尝试简单地启动任务并在最后等待所有任务。这将导致并行执行。
var tasks = collection1Items.Select(async item =>
{
//var collection2Item = await storageRepository.Get...
return new UIItem
{
//...
};
});
var uiItems = await Task.WhenAll(tasks);
PLINQ在使用内存构造并使用尽可能多的线程时非常有用,但如果与async-await技术(用于在访问外部资源时释放线程)一起使用,则最终会得到奇怪的结果。