是否有任何方法可以限制TPL数据流限制的性能下降?
我有一个复杂的组件管道,并试图限制所需的内存需求。我从多个文件并行读取,管道中的组件可能会从这些文件的随机部分中读取一些内容,其余组件则进行CPU绑定操作。
我使用通用的测试方法将性能测试台简化为这些测试。
private void TPLPerformaceTest(int generateNumbers,
ExecutionDataflowBlockOptions transformBlockOptions)
{
var transformBlock = new TransformBlock<int, int>(i => i, transformBlockOptions);
var storedCount = 0;
var generatedCount = 0;
var store = new ActionBlock<int>(i => Interlocked.Increment(ref storedCount));
transformBlock.LinkTo(store);
transformBlock.Completion.ContinueWith(_ => store.Complete());
for (int i = 0; i < generateNumbers; i++)
{
transformBlock.SendAsync(i).Wait(); //To ensure delivery
Interlocked.Increment(ref generatedCount);
}
transformBlock.Complete();
store.Completion.Wait();
Assert.IsTrue(generatedCount == generateNumbers);
Assert.IsTrue(storedCount == generateNumbers);
}
第一个没有无节流。在我的CPU上,大约需要花费[[12s来完成,消耗约[<800MB的RAM,平均CPU利用率约为35%。
[Test]
public void TPLPerformaceUnlimitedTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions());
}
仅将BoundedCapacity设置为int.MaxValue的第二项测试,因此完全没有限制,花费
20-30s
完成,消耗2.1GB的RAM,平均CPU利用率约为50%。根据手册,默认情况下,BoundedCapacity应该设置为int.MaxValue,所以我看不到性能下降的原因。[Test]
[Sequential]
public void TPLPerformaceBounedCapacityTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions()
{ BoundedCapacity = Int32.MaxValue });
}
第三个测试限制要生成数字的有界容量/ 1000,因此为100,000。它需要60秒钟才能完成,并消耗
450MB
RAM,平均CPU利用率约为60%。[Test]
[Sequential]
public void TPLPerformaceBounedCapacityTenthTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions()
{ BoundedCapacity = generateNumbers / 1000 });
}
第四测试限制MaxDegreeOfParallelism到-1,根据手册的无限制。它消耗了
27GB
的RAM,平均CPU利用率约为85%,并且在5分钟内尚未完成。[Test]
[Sequential]
public void TPLPerformaceMaxDegreeOfParallelismTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers, new ExecutionDataflowBlockOptions()
{ MaxDegreeOfParallelism = -1 });
}
由于我的合理期望,所有方法似乎都会非常严重地影响性能,并且不起作用。 transformBlock.SendAsync(i).Wait(); //To ensure delivery
此blocks当前线程在完成传递之前。您应该切换到
await
来释放线程以执行其他任务:
await transformBlock.SendAsync(i); //To ensure delivery
更新:我对你的话感到困惑
根据手册,
BoundedCapacity
应该默认设置为BoundedCapacity
因为这是不正确的,int.MaxValue
:
int.MaxValue
from official documentation中包含的大多数数据流块都支持有界容量的规范。 这是该块可以存储并在任何一次飞行中的项目数的限制。默认情况下,此值初始化为BoundedCapacity
(BoundedCapacity
),表示没有限制。在运行此代码后,您可以在此处看到所有默认值:
System.Threading.Tasks.Dataflow.dll
DataflowBlockOptions.Unbounded
因此将DataflowBlockOptions.Unbounded
设置为-1
的第二个测试does添加了一个限制,这增加了对块缓冲区中的位置可用性的一些检查。
您可以在第三次测试中看到类似的行为,它消耗的内存比第二次要少得多,但是对缓冲区进行更多的检查和等待时间以释放空间,因此它的工作速度较慢,但分配的内存很少。
此外,您还可以在屏幕截图中看到var options = new ExecutionDataflowBlockOptions();
等于:
BoundedCapacity
默认情况下,单个数据流块一次只处理一条消息,将所有尚未处理的消息排队,以便在当前处理的消息完成时可以对其进行处理。将此参数设置为
BoundedCapacity
后,您将打开潘多拉魔盒,因为所有消息都正在执行由同一任务计划程序同时执行,再次是int.MaxValue
:如果设置为
int.MaxValue
(MaxDegreeOfParallelism
),则可以同时处理任意数量的消息,而最大消息数量则由数据流块所针对的基础调度程序自动管理。正如我在内存消耗中看到的那样,任务调度程序决定为每条消息启动新线程,因为线程池中没有可用的线程,每条消息大约占用MaxDegreeOfParallelism
,因此您大约有1
个线程彼此争夺CPU时间。如您所见,他们并不擅长此事。推荐的并行度通常为MaxDegreeOfParallelism
,因此,如果要加快one块的速度,可以将
MaxDegreeOfParallelism
设置为此属性。但是,在更复杂的情况下,这并不总是最好的选择,因为其他块将停止等待CPU时间。那么您的
-1
是什么?