使用多线程时,应用程序意外关闭

问题描述 投票:0回答:2

我按照这里描述的文章使用BlockingCollection尝试了一个非常简单的Producer Consumer方法:MS Docs来理解多线程。

我的生产者是一个单独的任务,它从XML文件中读取(有大约4000个节点)并将XElement节点推送到阻塞集合。

我的消费者将有多个线程从阻塞集合读取并将文件上传到基于XElement的站点。

这里的问题是每次我尝试运行它时程序意外关闭。它击中了制片人Task.Run但在此之后停止了。我无法理解原因。难道我做错了什么?它甚至没有击中catch区块。

代码如下:

            BlockingCollection<XElement> collection = new BlockingCollection<XElement>(100);                
            string metadataFilePath = exportLocation + listTitle + "\\Metadata\\" + exportJobId + ".xml";
            //create the producer
            Task.Run(() =>
            {                    
                //Process only the files that have not been uploaded                                   
                XDocument xmlFile = XDocument.Load(metadataFilePath);
                var query = from c in xmlFile.Elements("Items").Elements("Item")
                            where c.Attribute("IsUploaded").Value == "No"
                            select c;
                foreach (var item in query)
                {
                    collection.Add(item);
                }
                collection.CompleteAdding();
            });

            //process consumer
            Parallel.ForEach(collection, (new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 2 }), (metadata) => {
                ProcessItems();
            });
c# multithreading blockingcollection
2个回答
2
投票

answer by Nish26对于问题中的问题是正确的。

我建议用Microsoft TPL Dataflow解决你的生产者/消费者问题:

using System.Threading.Tasks.Dataflow;

var parallelBoundedOptions = new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 100,
    MaxDegreeOfParallelism = 2,
};
var uploadItemBlock = new ActionBlock<XElement>(
    item => ProcessItem(item),
    parallelBoundedOptions
);
string metadataFilePath = exportLocation + listTitle + "\\Metadata\\" + exportJobId + ".xml";
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
            where c.Attribute("IsUploaded").Value == "No"
            select c;
foreach (var item in query)
{
    uploadItemBlock.SendAsync(item).Wait();
}
uploadItemBlock.Complete();
uploadItemBlock.Completion.Wait();

数据流使得更容易专注于生成和使用项目,而不是如何将它们从生产者传递给消费者。

问题中的实际问题是Parallel.Foreach正在使用BlockingCollection<T>.IEnumerable<T>.GetEnumerator而不是BlockingCollection<T>.GetConsumingEnumerable,如下所示:

static void Main()
{
    var collection = new BlockingCollection<int>(100);
    Task.Run(()=>
    {
        foreach (var element in Enumerable.Range(0, 100_000))
        {
            collection.Add(element);
        }
        collection.CompleteAdding();
    });

    Parallel.ForEach(
        collection, 
        new ParallelOptions { MaxDegreeOfParallelism = 2},
        i => Console.WriteLine(i));

    Console.WriteLine("Done");
}

立即打印“完成”

static void Main()
{
    var collection = new BlockingCollection<int>(100);
    Task.Run(()=>
    {
        foreach (var element in Enumerable.Range(0, 100_000))
        {
            collection.Add(element);
        }
        collection.CompleteAdding();
    });

    Parallel.ForEach(
        collection.GetConsumingEnumerable(), 
        new ParallelOptions { MaxDegreeOfParallelism = 2},
        i => Console.WriteLine(i));

    Console.WriteLine("Done");
}

打印所有数字


2
投票

假设您正在尝试运行控制台应用程序,我可以考虑以下问题:

  1. C#中的任务默认为后台线程,即它们无法使应用程序保持活动状态。如果主线程退出是前台线程,则后台线程也将停止执行。
  2. 考虑到#1,您的并行块可能会在生产者线程生成任何数据之前执行,因此程序退出也会导致后台生成器线程终止。尝试使用循环内的TryTake()从集合中读取消费者任务,并在程序中添加对Console.ReadLine()的调用,以确保在用户没有输入的情况下控制台无法退出。如果您想并行使用,请参见示例2 here

你可以看到更多的例子here。尝试在示例代码中注意以下事项:

  1. 使用块使用(BlockingCollection bc = new BlockingCollection())
  2. 在第一个线程中调用方法CompleteAdding(),指示集合将不再接受生产者添加的任何项目。一旦添加了所有项目,生产者线程就会调用它。在将集合标记为完成以进行添加之后,不允许添加到集合中,并且当集合为空时,尝试从集合中删除将不会等待。
  3. 在第二个示例中由使用者线程使用TryTake(out result)。消费者线程启动并尝试取出值。即使生产者线程没有添加任何项目,它也将继续等待,因为收集尚未通过生产者线程调用CompleteAdding()标记为IsAddingCompleted。当集合被标记为IsAddingCompleted并且集合为空时,消费者线程将从TryTake获得错误的返回值,即集合的IsCompleted属性变为true,允许消费者线程完成。

4.调用Console.ReadLine(),这样作为后台线程的任务都不会在没有完成的情况下终止。

希望这可以帮助。

© www.soinside.com 2019 - 2024. All rights reserved.