我正在寻找JoinBlock的替代方法,该方法可以通过n-TransformBlocks链接到,并将所有TransformBlock源块的消息合并/合并在一起,以便将这样的集合传递给另一个数据流块。
JoinBlock可以很好地完成工作,但仅限于最多连接3个源块。它也遭受许多低效率的困扰(加入2个源块的偶数类型(ints)非常慢)。有没有办法让任务从TransformBlocks返回并等待所有TransformBlocks都完成一个要传递的任务,然后再接受Task<item>
?
还有其他想法吗?我可能有1-20个这样的转换块,在传递联接的项目集合之前,需要将哪些项目联接在一起。每个转换块都保证为“已转换”的每个输入项恰好返回一个输出项。
编辑:要求澄清:
根据之前的一个问题,我按如下所示设置了JoinBlocks:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target1);
transformBlock2.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
一种方法是在BatchBlock
设置为Greedy
的情况下使用false
。在此配置中,该块将不执行任何操作,直到n
个不同块中有n
个项目等待使用(其中n
是您在创建BatchBlock
时设置的编号)。发生这种情况时,它将立即消耗所有n
个项目,并生成一个包含所有项目的数组。
此解决方案的一个警告是,未对结果数组进行排序:您不会知道哪个项目来自哪个来源。而且我不知道它的性能与JoinBlock
相比如何,您必须自己进行测试。 (尽管我会理解,由于非贪婪消费所必需的开销,因此以这种方式使用BatchBlock
是否会更慢。)
如果要对每个项目执行多个并行操作,恕我直言,在单个块内执行这些操作更为合理,而不是将它们拆分为多个块,然后尝试将独立结果再次合并为单个对象。所以我的建议是做这样的事情:
var block = new TransformBlock<MyClass, MyClass>(async item =>
{
Task<SomeType1> task1 = Task.Run(() => CalculateProperty1(item.Id));
Task<SomeType2> task2 = Task.Run(() => CalculateProperty2(item.Id));
await Task.WhenAll(task1, task2).ConfigureAwait(false);
item.Property1 = task1.Result;
item.Property2 = task2.Result;
return item;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2
});
在上述示例中,类型为MyClass
的项目通过TransformBlock
。使用每个属性的单独Property1
并行计算每个项目的属性Property2
和Task
。然后等待两个任务,当两个任务都完成时,将结果分配给该项目的属性。最后,返回已处理的项目。
您要使用这种方法唯一要注意的是,并行度将是内部并行操作与块的MaxDegreeOfParallelism
选项的乘积。因此,在上述示例中,并行度为2 x 2 =4。确切地说,这将是最大并行度,因为两个内部计算之一可能会比另一个慢。因此,在任何给定的时刻,实际的并行度可以在2到4之间。