我有一个 TPL 数据流管道,其中目标块链接到两个传播块,然后这两个块都链接到源块。全部都与
PropagateCompletion = true
相连。第一个传播块与仅接受偶数的过滤器链接,第二个传播块接受所有剩余消息。
发布最后一条消息后,我将第一个块设置为完成。不过似乎存在竞争条件。最后一个块似乎有时处理所有值,但有时仅处理第一个传播块接受的值以及第二个传播块接受的部分值。
我觉得存在竞争条件。但我不知道如何正确指示最终源块一切都已完成,只有在链接到它的两个传播块转发所有消息之后。
这是我的代码,简化为一个简单的示例:
internal static class Program
{
public static async Task Main(string[] args)
{
var linkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var bufferBlock = new BufferBlock<int>();
var fork1 = new TransformBlock<int, int>(n => n);
var fork2 = new TransformBlock<int, int>(n =>
{
Thread.Sleep(100);
return n;
});
var printBlock = new ActionBlock<int>(Console.WriteLine);
bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
fork1.LinkTo(printBlock, linkOptions);
fork2.LinkTo(printBlock, linkOptions);
for (var n = 1; n <= 10; ++n)
{
bufferBlock.Post(n);
}
bufferBlock.Complete();
await printBlock.Completion;
}
}
输出:
2
4
6
8
10
我希望它输出:
2
4
6
8
10
1
3
5
7
9
数据流图中有一个菱形,导致完成通过两个分支中的任何一个更快地传播,从而使最终块过早完成。
可以使用任务延续来自定义最后一个块的完成:
...
var printBlock = new ActionBlock<int>(Console.WriteLine);
bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
fork1.LinkTo(printBlock); // no completion propagation
fork2.LinkTo(printBlock);
Task.WhenAll(fork1.Completion, fork2.Completion)
.ContinueWith(t => printBlock.Complete(),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
for (var n = 1; n <= 10; ++n)
{
bufferBlock.Post(n);
}
bufferBlock.Complete();
await printBlock.Completion;
将
PropagateCompletion
更改为 false 将修复此问题。
var linkOptions = new DataflowLinkOptions
{
PropagateCompletion = false
};
这样fork 1完成不会停止打印块,你可以在最后手动完成它。
回答自己,因为我在另一个问题这里找到了答案。
问题是 fork1 和 fork2 都通过
PropagateCompletion = true
链接到 printBlock。即使 fork1 跳过了一些消息,它仍然会将完成传播到 printBlock,导致 printBlock 在 fork2 处理任何或所有消息之前完成。
解决办法是更换
fork1.LinkTo(printBlock, linkOptions);
fork2.LinkTo(printBlock, linkOptions);
与
fork1.LinkTo(printBlock);
fork2.LinkTo(printBlock);
Task.WhenAll(fork1.Completion, fork2.Completion).ContinueWith(_ => printBlock.Complete());