我需要构建将处理许多消息的TPL数据流管道。因为有很多消息,所以我不能简单地将它们Post
放入BufferBlock
的无限队列中,否则我将面临内存问题。因此,我想使用BoundedCapacity = 1
选项禁用队列,并使用MaxDegreeOfParallelism
使用并行任务处理,因为我的TransformBlock
可能需要花费一些时间来处理每条消息。我还使用PropagateCompletion
完成所有操作,并且无法沿管道传播。
但是当第一条消息刚发生错误时,我正面临错误处理的问题:调用await SendAsync
只是将我的应用切换为无限等待。
我已将案例简化为示例控制台应用程序:
var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
BoundedCapacity = 1
});
var process_block = new ActionBlock<int>(x =>
{
throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2,
BoundedCapacity = 1
});
data_buffer.LinkTo(process_block,
new DataflowLinkOptions { PropagateCompletion = true });
for (var k = 1; k <= 5; k++)
{
await data_buffer.SendAsync(k);
Console.WriteLine("Send: {0}", k);
}
data_buffer.Complete();
await process_block.Completion;
这是预期的行为。如果发生“下游”故障,则该错误不会在网格上“向后”传播。网格期望您检测到该故障(例如,通过process_block.Completion
)并予以解决。
如果要向后传播错误,则可以在下游块故障的情况下,在上游块await
上加一个process_block.Completion
或在faults上继续。
请注意,这不是唯一可行的解决方案;您可能要重建网格的该部分或将源链接到替代目标。源块没有故障,因此它们可以继续使用已修复的网格进行处理。