此数据流网络具有单个分支,并产生具有正确结果的正确文本输出。为什么没有完成?
// Connect multiple blocks
// source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
// |-> multiply2 -> writeListOut
var source = new BufferBlock<List<int>>();
var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
{
return l.Select(_l => (double)_l).ToList();
});
source.LinkTo(convertToDouble);
Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
{
return l.Select(_l => _l * 10.0).ToList();
};
var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
convertToDouble.LinkTo(multiply);
var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
{
return l;
});
multiply.LinkTo(multiplyBuffer);
var summation = new TransformBlock<List<double>, double>((List<double> l) =>
{
return l.Sum();
});
multiplyBuffer.LinkTo(summation);
var writeOut = new ActionBlock<double>((double d) =>
{
Console.WriteLine("Writing out: " + d.ToString());
});
summation.LinkTo(writeOut);
var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
multiplyBuffer.LinkTo(multiply2);
var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
{
Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l =>
_l.ToString()).ToList()));
});
multiply2.LinkTo(writeListOut);
source.Post(new List<int> { 1, 2, 3 });
Task.Run(async () =>
{
await Task.Delay(3000);
Console.WriteLine("posting 2nd...");
source.Post(new List<int> { 4, 5, 6 });
source.Complete();
});
// Never completes
try
{
writeOut.Completion.Wait();
writeListOut.Completion.Wait();
}
catch (AggregateException ex)
{
ex.Handle(e =>
{
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
return true;
});
}
我已经注意到,如果省略了Completion.wait()
调用,那么程序将返回。在执行网络时不会观察到任何错误。
样本输出:
写作清单:100、200、300写出:60发布第二...写出:150写出清单:400、500、600(挂起)
预期输出:
写作清单:100、200、300写出:60发布第二...写出:150写出清单:400、500、600(返回)
在TPL中,默认情况下,源完成不传递给其他块。
您需要构造一个System.Threading.Tasks.Dataflow.DataflowLinkOptions
并将其PropagateCompletion
属性设置为true
,然后将其传递到您对LinkTo
的调用中。
或者,您可以依次在所有块上调用Complete
方法。