我有一个关于使用Dataflow TPL库实现流水线的问题。
我的情况是,我有一个软件,需要同时处理一些任务。处理过程是这样的:首先我们在全局级别处理相册,然后我们进入相册内部,单独处理每张图片。假设应用程序有处理槽,并且它们是可配置的(为了举例,假设槽=2)。这意味着应用程序可以处理以下两种情况
a)两个相册同时处理 b)一个相册+一个不同相册的照片 c)同一个相册同时处理两张照片 d)不同相册同时处理两张照片
目前我实现了这样的流程。
var albumTransferBlock = new TransformBlock<Album, Album>(ProcessAlbum,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
ActionBlock<Album> photoActionBlock = new ActionBlock<Album>(ProcessPhoto);
albumTransferBlock.LinkTo(photoActionBlock);
Album ProcessAlbum(Album a)
{
return a;
}
void ProcessPhoto(Album album)
{
foreach (var photo in album)
{
// do some processing
}
}
我的问题是,当我一次处理一个相册时,应用程序永远不会使用两个插槽来处理照片。除了c)之外,它符合所有要求
谁能帮我解决这个问题,使用DataFlow TPL?
我想我可以自己回答。我的做法是
1)我创建了一个接口IProcessor和方法Process()2)用接口IProcessor包装AlbumProcessing和PhotoProcessing3)创建了一个ActionBlock,它把IProcessor作为Input并执行Process方法。
4) 在处理相册的最后,我将所有照片的处理加入到ActionBlock中。
这100%满足了我的要求。也许有人有其他的解决方案?
你可以使用 TransformManyBlock
用于处理相册,连接到一个 ActionBlock
用于处理照片,这样每个相册在处理它的照片之前就被处理了。对于施加超出单个块边界的并发限制,您可以使用有限并发的 TaskScheduler
或 SemaphoreSlim
. 第二种方案更加灵活,因为它也允许对异步操作进行节流。在你的情况下,所有的操作都是同步的,所以你可以自由选择其中一种方法。在这两种情况下,你仍然应该配置 MaxDegreeOfParallelism
选项的块的最大并发量限制,否则--如果您将它们设置为 无限- 处理顺序会变得过于随机。
下面是一个例子 TaskScheduler
办法。它使用的是 ConcurrentScheduler
的财产 ConcurrentExclusiveSchedulerPair
类的例子。
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2,
TaskScheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default,
maxConcurrencyLevel: 2).ConcurrentScheduler
};
var albumsBlock = new TransformManyBlock<Album, Photo>(album =>
{
ProcessAlbum(album);
return album.Photos;
}, options);
var photosBlock = new ActionBlock<Photo>(photo =>
{
ProcessPhoto(photo);
}, options);
albumsBlock.LinkTo(photosBlock);
这里是一个例子 SemaphoreSlim
办法。使用 WaitAsync
方法,而不是 Wait
的优点是,等待获取信号体的过程会发生。异步所以,没有 ThreadPool
线程将被无谓的封锁。
var throttler = new SemaphoreSlim(2);
var albumsBlock = new TransformManyBlock<Album, Photo>(async album =>
{
await throttler.WaitAsync();
try
{
ProcessAlbum(album);
return album.Photos;
}
finally { throttler.Release(); }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
var photosBlock = new ActionBlock<Photo>(async photo =>
{
await throttler.WaitAsync();
try
{
ProcessPhoto(photo);
}
finally { throttler.Release(); }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
albumsBlock.LinkTo(photosBlock);