使用 TPL 数据流块处理异常

问题描述 投票:0回答:3

我有一个简单的 tpl 数据流,它基本上执行一些任务。 我注意到,当任何数据块中出现异常时,它不会被初始父块调用者捕获。 我添加了一些手动代码来检查异常,但似乎不是正确的方法。

if (readBlock.Completion.Exception != null
    || saveBlockJoinedProcess.Completion.Exception != null
    || processBlock1.Completion.Exception != null
    || processBlock2.Completion.Exception != null)
{
    throw readBlock.Completion.Exception;
}

我在网上查看了建议的方法,但没有看到任何明显的东西。 因此,我在下面创建了一些示例代码,并希望获得一些有关更好解决方案的指导:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace TPLDataflow
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                //ProcessB();
                ProcessA();
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception in Process!");
                throw new Exception($"exception:{e}");
            }
            Console.WriteLine("Processing complete!");
            Console.ReadLine();
        }

        private static void ProcessB()
        {
            Task.WhenAll(Task.Run(() => DoSomething(1, "ProcessB"))).Wait();
        }

        private static void ProcessA()
        {
            var random = new Random();
            var readBlock = new TransformBlock<int, int>(x =>
            {
                try { return DoSomething(x, "readBlock"); }
                catch (Exception e) { throw e; }
            }); //1

            var braodcastBlock = new BroadcastBlock<int>(i => i); // ⬅ Here

            var processBlock1 = new TransformBlock<int, int>(x =>
                DoSomethingAsync(5, "processBlock1")); //2
            var processBlock2 = new TransformBlock<int, int>(x =>
                DoSomethingAsync(2, "processBlock2")); //3

            //var saveBlock =
            //    new ActionBlock<int>(
            //    x => Save(x)); //4

            var saveBlockJoinedProcess =
                new ActionBlock<Tuple<int, int>>(
                x => SaveJoined(x.Item1, x.Item2)); //4

            var saveBlockJoin = new JoinBlock<int, int>();

            readBlock.LinkTo(braodcastBlock, new DataflowLinkOptions
                { PropagateCompletion = true });

            braodcastBlock.LinkTo(processBlock1,
                new DataflowLinkOptions { PropagateCompletion = true }); //5

            braodcastBlock.LinkTo(processBlock2,
                new DataflowLinkOptions { PropagateCompletion = true }); //6


            processBlock1.LinkTo(
                saveBlockJoin.Target1); //7

            processBlock2.LinkTo(
                saveBlockJoin.Target2); //8

            saveBlockJoin.LinkTo(saveBlockJoinedProcess,
                new DataflowLinkOptions { PropagateCompletion = true });

            readBlock.Post(1); //10
                               //readBlock.Post(2); //10

            Task.WhenAll(processBlock1.Completion,processBlock2.Completion)
                .ContinueWith(_ => saveBlockJoin.Complete());

            readBlock.Complete(); //12
            saveBlockJoinedProcess.Completion.Wait(); //13
            if (readBlock.Completion.Exception != null
                || saveBlockJoinedProcess.Completion.Exception != null
                || processBlock1.Completion.Exception != null
                || processBlock2.Completion.Exception != null)
            {
                throw readBlock.Completion.Exception;
            }
        }
        private static int DoSomething(int i, string method)
        {
            Console.WriteLine($"Do Something, callng method : { method}");
            throw new Exception("Fake Exception!");
            return i;
        }
        private static async Task<int> DoSomethingAsync(int i, string method)
        {
            Console.WriteLine($"Do SomethingAsync");
            throw new Exception("Fake Exception!");
            await Task.Delay(new TimeSpan(0, 0, i));
            Console.WriteLine($"Do Something : {i}, callng method : { method}");
            return i;
        }
        private static void Save(int x)
        {

            Console.WriteLine("Save!");
        }
        private static void SaveJoined(int x, int y)
        {
            Thread.Sleep(new TimeSpan(0, 0, 10));
            Console.WriteLine("Save Joined!");
        }
    }
}
c# .net task-parallel-library tpl-dataflow
3个回答
4
投票

我在网上查看了建议的方法,但没有看到任何明显的内容。

如果你有管道(或多或少),那么常见的方法是使用

PropagateCompletion
来关闭管道。如果您有更复杂的拓扑,那么您需要手动完成块。

就您而言,您在此处尝试传播:

Task.WhenAll(
    processBlock1.Completion,
    processBlock2.Completion)
    .ContinueWith(_ => saveBlockJoin.Complete());

但是这段代码不会传播异常。当

processBlock1.Completion
processBlock2.Completion
都完成时,
saveBlockJoin
就完成成功

更好的解决方案是使用

await
而不是
ContinueWith
:

async Task PropagateToSaveBlockJoin()
{
    try
    {
        await Task.WhenAll(processBlock1.Completion, processBlock2.Completion);
        saveBlockJoin.Complete();
    }
    catch (Exception ex)
    {
        ((IDataflowBlock)saveBlockJoin).Fault(ex);
    }
}
_ = PropagateToSaveBlockJoin();

使用

await
鼓励您处理异常,您可以通过将异常传递给
Fault
来传播异常来实现这一点。


2
投票
开箱即用的 TPL 数据流不支持在管道中向后传播错误,当块的容量有限时,这一点尤其令人烦恼。在这种情况下,下游块中的错误可能会导致其后面的块无限期地阻塞。我知道的唯一解决方案是使用取消功能,并取消所有块,以防有人失败。这是如何做到的。首先创建一个

CancellationTokenSource

var cts = new CancellationTokenSource();
然后逐个创建块,在所有块的选项中嵌入相同的

CancellationToken

var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 10, CancellationToken = cts.Token }; var block1 = new TransformBlock<double, double>(Math.Sqrt, options); var block2 = new ActionBlock<double>(Console.WriteLine, options);
然后将块链接在一起,包括 

PropagateCompletion

 设置:

block1.LinkTo(block2, new DataflowLinkOptions { PropagateCompletion = true });
最后使用扩展方法,在出现异常时触发

CancellationTokenSource

的取消:

block1.OnFaultedCancel(cts); block2.OnFaultedCancel(cts);

OnFaultedCancel

扩展方法如下所示:

public static class DataflowExtensions { public static void OnFaultedCancel(this IDataflowBlock dataflowBlock, CancellationTokenSource cts) { _ = dataflowBlock.Completion.ContinueWith(_ => cts.Cancel(), default, TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } }


2024 年更新: 实际上,上面的简单解决方案并不能保证管道 100% 完成。 TPL Dataflow 中存在一个 bug,该 bug 已已知三年(自 2021 年 5 月起),可能会导致 TPL Dataflow 管道无限期挂起。如果一系列事件以完全正确(错误)的顺序发生,那么仅仅运气不好就可能会触发该错误。它可以在实验室中一致地再现。微软甚至还没有对这个错误做出回应,我也不指望它会很快得到解决(如果有的话)。您可以在这个答案中找到更多详细信息,包括复制品和解决方案。

坦率地说,我不会建议任何人使用 TPL Dataflow 作为任何关键任务应用程序的管道工具。 TPL 数据流是为了创建复杂的数据流网格而不是简单的管道而发明的,因此它在执行简单任务时表现不佳也就不足为奇了。这是一个有趣的玩具,但不是您开展业务的可靠工具。


0
投票
乍一看,如果只有一些小问题(不看你的架构)。在我看来,你混合了一些新的和一些旧的结构。并且有一些代码部分是不必要的。

例如:

private static void ProcessB() { Task.WhenAll(Task.Run(() => DoSomething(1, "ProcessB"))).Wait(); }

使用Wait()方法,如果发生任何异常,它们将被包装在System.AggregateException中。在我看来,这样更好:

private static async Task ProcessBAsync() { await Task.Run(() => DoSomething(1, "ProcessB")); }

使用 async-await,如果发生异常,await 语句将重新抛出第一个异常,该异常被包装在 System.AggregateException 中。这允许您尝试捕获具体的异常类型并仅处理您真正可以处理的情况。

另一件事是你的代码的这一部分:

private static void ProcessA() { var random = new Random(); var readBlock = new TransformBlock<int, int>( x => { try { return DoSomething(x, "readBlock"); } catch (Exception e) { throw e; } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); //1

为什么捕获异常只是为了重新抛出它?在这种情况下,try-catch 是多余的。

这里:

private static void SaveJoined(int x, int y) { Thread.Sleep(new TimeSpan(0, 0, 10)); Console.WriteLine("Save Joined!"); }

使用await 会好很多

Task.Delay(....)

。使用 
Task.Delay(...)
,您的应用程序将不会冻结。

© www.soinside.com 2019 - 2024. All rights reserved.