两分支数据流网络未完成

问题描述 投票:2回答:1

此数据流网络具有单个分支,并产生具有正确结果的正确文本输出。为什么没有完成?

            // Connect multiple blocks
            // source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
            //                                                        |-> multiply2 -> writeListOut
            var source = new BufferBlock<List<int>>();
            var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
            {
                return l.Select(_l => (double)_l).ToList();
            });
            source.LinkTo(convertToDouble);
            Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
            {
                return l.Select(_l => _l * 10.0).ToList();
            };
            var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            convertToDouble.LinkTo(multiply);
            var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
            {
                return l;
            });
            multiply.LinkTo(multiplyBuffer);
            var summation = new TransformBlock<List<double>, double>((List<double> l) =>
            {
                return l.Sum();
            });
            multiplyBuffer.LinkTo(summation);
            var writeOut = new ActionBlock<double>((double d) =>
            {
                Console.WriteLine("Writing out: " + d.ToString());
            });
            summation.LinkTo(writeOut);
            var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            multiplyBuffer.LinkTo(multiply2);
            var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
            {
                Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l => 
                    _l.ToString()).ToList()));
            });
            multiply2.LinkTo(writeListOut);

            source.Post(new List<int> { 1, 2, 3 });

            Task.Run(async () =>
            {
                await Task.Delay(3000);
                Console.WriteLine("posting 2nd...");
                source.Post(new List<int> { 4, 5, 6 });
                source.Complete();
            });

            // Never completes
            try
            {
                writeOut.Completion.Wait();
                writeListOut.Completion.Wait();
            }
            catch (AggregateException ex)
            {
                ex.Handle(e =>
                {
                    Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
                    return true;
                });
            }

我已经注意到,如果省略了Completion.wait()调用,那么程序将返回。在执行网络时不会观察到任何错误。

样本输出:

写作清单:100、200、300写出:60发布第二...写出:150写出清单:400、500、600(挂起)

预期输出:

写作清单:100、200、300写出:60发布第二...写出:150写出清单:400、500、600(返回)

c# task-parallel-library tpl-dataflow
1个回答
2
投票

在TPL中,默认情况下,源完成不传递给其他块。

您需要构造一个System.Threading.Tasks.Dataflow.DataflowLinkOptions并将其PropagateCompletion属性设置为true,然后将其传递到您对LinkTo的调用中。

或者,您可以依次在所有块上调用Complete方法。

© www.soinside.com 2019 - 2024. All rights reserved.