示例程序具有以下BatchBlock:new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });
,其中有60个int数据项被发送,并且消耗了单独的任务。
问题是await sourceBlock.SendAsync(i);
似乎没有等待,即使达到了BatchBlock有界容量,数据仍在不断地发送而不消耗任务首先取出任何项目。最终BatchBlock只接收2批10个int数据项。我希望await sourceBlock.SendAsync(i);
在发送20个项目时暂停执行,因为块的有界容量设置为10,最多2个组。然后在某些时候消耗任务将接收数据并且该过程将重复。
我已经附加了下面的代码,创建一个简单的控制台应用程序,将以下内容添加到main:
BatchBlockIssueReplication().GetAwaiter().GetResult();
调用的方法:
public static async Task BatchBlockIssueReplication()
{
var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });
// Reading data from the source block
Task fireAndForget = Task.Run(async () =>
{
while (!sourceBlock.Completion.IsCanceled)
{
await Task.Delay(1500);
if (await sourceBlock.OutputAvailableAsync() && sourceBlock.TryReceiveAll(out var results))
{
Console.WriteLine("Received: ");
foreach (var result in results)
{
Console.Write($"{result.Length} ");
}
Console.WriteLine();
}
}
});
for (int i = 0; i < 60; i++)
{
Console.WriteLine($"Sending {i} to the source block");
await sourceBlock.SendAsync(i);
}
Console.WriteLine("Finished sending data to the source block");
await Task.Delay(10000);
}
您尚未设置BoundedCapacity,它控制输入缓冲区中可以等待的项目数。超过这将使SendAsync
等待。
设置MaxNumberOfGroups属性,该属性是拒绝接收任何其他输入之前该块将生成的组数。
来自文档:
获取或设置块应生成的最大组数。
如果你想让你的块在inputbuffer中保持20个块并等待,你应该设置BoundedCapacity:
var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions
{
BoundedCapacity = 20
});
一旦达到最大值,await sourceBlock.SendAsync(i);
将不会暂停,因为该块主动拒绝更多项目。当发生这种情况时,SendAsync
返回false
,表示该块不接受新消息。如果您写出SendAsync
调用的结果,您可以看到块停止接收新消息的位置:
Sending 0 to the source block
True
Sending 1 to the source block
True
Sending 2 to the source block
True
Sending 3 to the source block
True
Sending 4 to the source block
True
Sending 5 to the source block
True
Sending 6 to the source block
True
Sending 7 to the source block
True
Sending 8 to the source block
True
Sending 9 to the source block
True
Sending 10 to the source block
True
Sending 11 to the source block
True
Sending 12 to the source block
True
Sending 13 to the source block
True
Sending 14 to the source block
True
Sending 15 to the source block
True
Sending 16 to the source block
True
Sending 17 to the source block
True
Sending 18 to the source block
True
Sending 19 to the source block
True
Sending 20 to the source block
False
Sending 21 to the source block
False
Sending 22 to the source block
False
Sending 23 to the source block
False
Sending 24 to the source block
False
Sending 25 to the source block
False
Sending 26 to the source block
False
Sending 27 to the source block
False
Sending 28 to the source block
False
Sending 29 to the source block
False
Sending 30 to the source block
False
Sending 31 to the source block
False
Sending 32 to the source block
False
Sending 33 to the source block
False
Sending 34 to the source block
False
Sending 35 to the source block
False
Sending 36 to the source block
False
Sending 37 to the source block
False
Sending 38 to the source block
False
Sending 39 to the source block
False
Sending 40 to the source block
False
Sending 41 to the source block
False
Sending 42 to the source block
False
Sending 43 to the source block
False
Sending 44 to the source block
False
Sending 45 to the source block
False
Sending 46 to the source block
False
Sending 47 to the source block
False
Sending 48 to the source block
False
Sending 49 to the source block
False
Sending 50 to the source block
False
Sending 51 to the source block
False
Sending 52 to the source block
False
Sending 53 to the source block
False
Sending 54 to the source block
False
Sending 55 to the source block
False
Sending 56 to the source block
False
Sending 57 to the source block
False
Sending 58 to the source block
False
Sending 59 to the source block
False
Finished sending data to the source block
Received:
10 10