我正在使用TPL Dataflow建立一个管道。这个流水线在逻辑上应该做到以下几点。
pollingBlock
.monitoringBlock
. 每个 monitoringBlock
只能容纳1件物品,但有多个 monitoringBlocks
.pollingBlock
应该继续处理所有的项目,包括一个发布在 while (true)
的方式。monitoringBlocks
在被占用的情况下,不应该接受任何其他的消息,这些消息应该只是被删除,而不做进一步的处理。monitoringBlock
消息应该被标记为已完成或转移到下一个块进行处理,这个下一个块就是 processingBlock
一个简单的样本。
public Task ExecutePipeline()
{
var block = CreatePollingPipeline();
block.Post((_serviceOne, _serviceTwo));
block.Complete();
return block.Completion;
}
public ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)> CreatePollingPipeline()
{
var pollingAlertHolder = new BufferBlock<(string input1, string input2)>();
var pollingBlock = new ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)>(services =>
{
while (true)
{
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP2", "INPVAL2"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP2", "INPVAL2"));
Thread.Sleep(2000);
}
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
{
Console.WriteLine("monitoringBlock started");
Thread.Sleep(5000);
Console.WriteLine("monitoringBlock completed");
return (inputs.input1, inputs.input2);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
pollingAlertHolder.LinkTo(monitoringBlock, new DataflowLinkOptions() { PropagateCompletion = true },
inputs => inputs.input1 == "INP1" && inputs.input2 == "INPVAL");
pollingAlertHolder.LinkTo(DataflowBlock.NullTarget<(string input1, string input2)>());
var processingBlock = new ActionBlock<(string input1, string input2)>(i =>
{
Console.WriteLine("processingBlock started");
Thread.Sleep(2000);
Console.WriteLine("processingBlock completed");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
monitoringBlock.LinkTo(processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
return pollingBlock;
}
我的问题是,我如何保持 monitoringBlock
占用 processingBlock
完成它的工作?我不希望任何项目被发布到... monitoringBlock
訊息還沒說完 全 处理周期。
正如在评论中已经提到的,你可以简单地封装逻辑的 monitoringBlock
和 processingBlock
在一个区块中,例如,你可以通过预定义的 Datablock.Encapsulate
方法。
但是,如果你不想这样做,你可以使用 AutoResetEvent
或类似的抽象,你的代码可以是这样的。
AutoResetEvent dataflowEvent = new AutoResetEvent(true);
var bufferBlock = new ActionBLock<(string input1, string input2)>(i =>
{
dataflowEvent.WaitOne();
monitoringBlock.Post(i);
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
{
Console.WriteLine("monitoringBlock started");
Thread.Sleep(5000);
Console.WriteLine("monitoringBlock completed");
dataflowEvent.Set();
return (inputs.input1, inputs.input2);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });