Parallel.ForEach() 没有产生结果[重复]

问题描述 投票:0回答:2

我正在尝试使用

Parallel.ForEach()
并行查询 mongo-db 但我没有得到任何结果。但是,当我尝试在常规
foreach
循环中运行相同的操作时,我能够执行预期的任务。

var exceptions = new ConcurrentQueue<Exception>();
var secondaryObjectsDictionaryCollection = new Dictionary<string, List<JObject>>();

// This works
foreach(var info in infos)
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
}

//This does not
Parallel.ForEach(infos, async info =>
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
});

我想并行执行此任务,只是因为涉及不同的 mongodb 集合,并且还可以减少响应时间。

我无法弄清楚我的并行循环出了什么问题。 并行执行这些任务的任何其他方法也将起作用。

c# asynchronous async-await task-parallel-library parallel.foreach
2个回答
6
投票

让我们看一下更简单的示例来说明相同的问题

您有与此类似的代码

var results = new Dictionary<int, int>();

Parallel.ForEach(Enumerable.Range(0, 5), async index =>
{
  var result = await DoAsyncJob(index);
  results.TryAdd(index, result);
});

您的代码无法运行,因为表达式

async index => {...}

返回未等待

的任务

像这样

Parallel.ForEach(Enumerable.Range(0, 5), index => new Task());

顺便说一句,当您像示例中那样使用多线程时,您应该使用 ConcurrentDictionary 而不是 Dictionary,当您进行并行更新以避免错误和死锁时

这里最好的解决方案是不要使用并行循环,而是使用 Task.WhenAll

var tasks = Enumerable.Range(0, 5).Select(async index =>
{
  var result = await DoAsyncJob(index);
  results.TryAdd(index, result);
});

await Task.WhenAll(tasks);

4
投票

Parallel.ForEach 与传入

async
方法不兼容。如果您想要类似于 Parallel.ForEach 的东西,您可以使用Dataflow,它是 ActionBlock。

var workerBlock = new ActionBlock<Info>(async info => 
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        //Note this is not thread safe and you need to put a lock around it.
        lock (secondaryObjectsDictionaryCollection) 
        {
            secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
        }
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach(var info in infos)
{
    workerBlock.Post(info);
}
workerBlock.Complete();
© www.soinside.com 2019 - 2024. All rights reserved.