我有一个简单的 tpl 数据流,它基本上执行一些任务。 我注意到,当任何数据块中出现异常时,它不会被初始父块调用者捕获。 我添加了一些手动代码来检查异常,但似乎不是正确的方法。
if (readBlock.Completion.Exception != null
|| saveBlockJoinedProcess.Completion.Exception != null
|| processBlock1.Completion.Exception != null
|| processBlock2.Completion.Exception != null)
{
throw readBlock.Completion.Exception;
}
我在网上查看了建议的方法,但没有看到任何明显的东西。 因此,我在下面创建了一些示例代码,并希望获得一些有关更好解决方案的指导:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace TPLDataflow
{
class Program
{
static void Main(string[] args)
{
try
{
//ProcessB();
ProcessA();
}
catch (Exception e)
{
Console.WriteLine("Exception in Process!");
throw new Exception($"exception:{e}");
}
Console.WriteLine("Processing complete!");
Console.ReadLine();
}
private static void ProcessB()
{
Task.WhenAll(Task.Run(() => DoSomething(1, "ProcessB"))).Wait();
}
private static void ProcessA()
{
var random = new Random();
var readBlock = new TransformBlock<int, int>(x =>
{
try { return DoSomething(x, "readBlock"); }
catch (Exception e) { throw e; }
}); //1
var braodcastBlock = new BroadcastBlock<int>(i => i); // ⬅ Here
var processBlock1 = new TransformBlock<int, int>(x =>
DoSomethingAsync(5, "processBlock1")); //2
var processBlock2 = new TransformBlock<int, int>(x =>
DoSomethingAsync(2, "processBlock2")); //3
//var saveBlock =
// new ActionBlock<int>(
// x => Save(x)); //4
var saveBlockJoinedProcess =
new ActionBlock<Tuple<int, int>>(
x => SaveJoined(x.Item1, x.Item2)); //4
var saveBlockJoin = new JoinBlock<int, int>();
readBlock.LinkTo(braodcastBlock, new DataflowLinkOptions
{ PropagateCompletion = true });
braodcastBlock.LinkTo(processBlock1,
new DataflowLinkOptions { PropagateCompletion = true }); //5
braodcastBlock.LinkTo(processBlock2,
new DataflowLinkOptions { PropagateCompletion = true }); //6
processBlock1.LinkTo(
saveBlockJoin.Target1); //7
processBlock2.LinkTo(
saveBlockJoin.Target2); //8
saveBlockJoin.LinkTo(saveBlockJoinedProcess,
new DataflowLinkOptions { PropagateCompletion = true });
readBlock.Post(1); //10
//readBlock.Post(2); //10
Task.WhenAll(processBlock1.Completion,processBlock2.Completion)
.ContinueWith(_ => saveBlockJoin.Complete());
readBlock.Complete(); //12
saveBlockJoinedProcess.Completion.Wait(); //13
if (readBlock.Completion.Exception != null
|| saveBlockJoinedProcess.Completion.Exception != null
|| processBlock1.Completion.Exception != null
|| processBlock2.Completion.Exception != null)
{
throw readBlock.Completion.Exception;
}
}
private static int DoSomething(int i, string method)
{
Console.WriteLine($"Do Something, callng method : { method}");
throw new Exception("Fake Exception!");
return i;
}
private static async Task<int> DoSomethingAsync(int i, string method)
{
Console.WriteLine($"Do SomethingAsync");
throw new Exception("Fake Exception!");
await Task.Delay(new TimeSpan(0, 0, i));
Console.WriteLine($"Do Something : {i}, callng method : { method}");
return i;
}
private static void Save(int x)
{
Console.WriteLine("Save!");
}
private static void SaveJoined(int x, int y)
{
Thread.Sleep(new TimeSpan(0, 0, 10));
Console.WriteLine("Save Joined!");
}
}
}
我在网上查看了建议的方法,但没有看到任何明显的内容。
如果你有管道(或多或少),那么常见的方法是使用
PropagateCompletion
来关闭管道。如果您有更复杂的拓扑,那么您需要手动完成块。
就您而言,您在此处尝试传播:
Task.WhenAll(
processBlock1.Completion,
processBlock2.Completion)
.ContinueWith(_ => saveBlockJoin.Complete());
但是这段代码不会传播异常。当
processBlock1.Completion
和 processBlock2.Completion
都完成时,saveBlockJoin
就完成成功。
更好的解决方案是使用
await
而不是 ContinueWith
:
async Task PropagateToSaveBlockJoin()
{
try
{
await Task.WhenAll(processBlock1.Completion, processBlock2.Completion);
saveBlockJoin.Complete();
}
catch (Exception ex)
{
((IDataflowBlock)saveBlockJoin).Fault(ex);
}
}
_ = PropagateToSaveBlockJoin();
使用
await
鼓励您处理异常,您可以通过将异常传递给 Fault
来传播异常来实现这一点。
CancellationTokenSource
:
var cts = new CancellationTokenSource();
然后逐个创建块,在所有块的选项中嵌入相同的CancellationToken
:
var options = new ExecutionDataflowBlockOptions()
{ BoundedCapacity = 10, CancellationToken = cts.Token };
var block1 = new TransformBlock<double, double>(Math.Sqrt, options);
var block2 = new ActionBlock<double>(Console.WriteLine, options);
然后将块链接在一起,包括 PropagateCompletion
设置:
block1.LinkTo(block2, new DataflowLinkOptions { PropagateCompletion = true });
最后使用扩展方法,在出现异常时触发CancellationTokenSource
的取消:
block1.OnFaultedCancel(cts);
block2.OnFaultedCancel(cts);
OnFaultedCancel
扩展方法如下所示:
public static class DataflowExtensions
{
public static void OnFaultedCancel(this IDataflowBlock dataflowBlock,
CancellationTokenSource cts)
{
_ = dataflowBlock.Completion.ContinueWith(_ => cts.Cancel(), default,
TaskContinuationOptions.OnlyOnFaulted |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
}
2024 年更新: 实际上,上面的简单解决方案并不能保证管道 100% 完成。 TPL Dataflow 中存在一个 bug,该 bug 已已知三年(自 2021 年 5 月起),可能会导致 TPL Dataflow 管道无限期挂起。如果一系列事件以完全正确(错误)的顺序发生,那么仅仅运气不好就可能会触发该错误。它可以在实验室中一致地再现。微软甚至还没有对这个错误做出回应,我也不指望它会很快得到解决(如果有的话)。您可以在这个答案中找到更多详细信息,包括复制品和解决方案。
坦率地说,我不会建议任何人使用 TPL Dataflow 作为任何关键任务应用程序的管道工具。 TPL 数据流是为了创建复杂的数据流网格而不是简单的管道而发明的,因此它在执行简单任务时表现不佳也就不足为奇了。这是一个有趣的玩具,但不是您开展业务的可靠工具。
例如:
private static void ProcessB()
{
Task.WhenAll(Task.Run(() => DoSomething(1, "ProcessB"))).Wait();
}
使用Wait()方法,如果发生任何异常,它们将被包装在System.AggregateException中。在我看来,这样更好:
private static async Task ProcessBAsync()
{
await Task.Run(() => DoSomething(1, "ProcessB"));
}
使用 async-await,如果发生异常,await 语句将重新抛出第一个异常,该异常被包装在 System.AggregateException 中。这允许您尝试捕获具体的异常类型并仅处理您真正可以处理的情况。
另一件事是你的代码的这一部分:
private static void ProcessA()
{
var random = new Random();
var readBlock = new TransformBlock<int, int>(
x =>
{
try { return DoSomething(x, "readBlock"); }
catch (Exception e)
{
throw e;
}
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); //1
为什么捕获异常只是为了重新抛出它?在这种情况下,try-catch 是多余的。
这里:
private static void SaveJoined(int x, int y)
{
Thread.Sleep(new TimeSpan(0, 0, 10));
Console.WriteLine("Save Joined!");
}
使用await 会好很多
Task.Delay(....)
。使用
Task.Delay(...)
,您的应用程序将不会冻结。