我正在用 C# 构建一个生产者消费者队列,并且正在阅读以寻找稳健性和性能方面的最佳方法。 多年来我一直使用
BlockingCollection
,但我发现了 TPLDataflow
和 Channels
。
我一直在做一些基准测试,我发现 TPL 和通道在使元素出队方面要快得多。
我的要求是:
IProducerConsumer
界面:
public interface IProducerConsumer
{
void Enqueue(Action item);
void Stop();
void StartDequeing();
}
阻塞收集实现:
public class BlockingCollectionQueue : IProducerConsumer
{
private readonly BlockingCollection<Action> _defaultQueue;
private Task _dequeTask;
public BlockingCollectionQueue()
{
_defaultQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
}
public void Enqueue(Action item)
{
if (!_defaultQueue.IsAddingCompleted)
{
_defaultQueue.Add(item);
}
}
public void Stop()
{
_defaultQueue.CompleteAdding();
}
public void StartDequeing()
{
Task.Run(DequeueTask);
}
private void DequeueTask()
{
foreach (var item in _defaultQueue.GetConsumingEnumerable())
{
item?.Invoke();
}
}
}
渠道实施:
public class ChannelQueue : IProducerConsumer
{
private readonly Channel<Action> _channel;
private readonly ChannelWriter<Action> _channelWriter;
private readonly ChannelReader<Action> _channelReader;
public ChannelQueue()
{
_channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
_channelWriter = _channel.Writer;
_channelReader = _channel.Reader;
}
public void Enqueue(Action item)
{
_channelWriter.TryWrite(item);
}
public void StartDequeing()
{
Task.Run(DequeueTask);
}
private async Task DequeueTask()
{
while (await _channelReader.WaitToReadAsync())
{
while (_channelReader.TryRead(out var job))
{
job?.Invoke();
}
}
}
public void Stop()
{
_channelWriter.Complete();
}
}
TPLDataFlow 使用
BufferBlock
实现:
public class DataFlowQueue : IProducerConsumer
{
private readonly BufferBlock<Action> _bufferBlock;
private Task _dequeTask;
public DataFlowQueue()
{
var dataflowOptions = new DataflowBlockOptions() { EnsureOrdered = true };
_bufferBlock = new BufferBlock<Action>(dataflowOptions);
}
public void Enqueue(Action item)
{
_bufferBlock.Post(item);
}
public void StartDequeing()
{
_dequeTask = Task.Run(DequeueTask);
}
private async Task DequeueTask()
{
while (await _bufferBlock.OutputAvailableAsync())
{
while(_bufferBlock.TryReceive(out var item))
{
item?.Invoke();
}
}
}
public void Stop()
{
_bufferBlock.Complete();
}
}
TPLDataFlow 使用
ActionBlock
:
public class ActionBlockQueue : IProducerConsumer
{
private readonly ActionBlock<Action> _actionBlock;
private Task _dequeTask;
public ActionBlockQueue()
{
var dataflowOptions = new ExecutionDataflowBlockOptions() { EnsureOrdered = true, MaxDegreeOfParallelism = 1 };
_actionBlock = new ActionBlock<Action>(item=> item?.Invoke(), dataflowOptions);
}
public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
{
_actionBlock.Post(item);
}
public void StartDequeing()
{
}
public void Stop()
{
_actionBlock.Complete();
}
}
使用 BenchmarDotNet 进行基准测试
正如您所看到的,所有实现都将
Action
存储在队列中,并且我使用 AutoResetEvent
在最后一个元素出队时发出信号。
public class MultipleJobBenchMark
{
private AutoResetEvent _autoResetEvent;
public MultipleJobBenchMark()
{
_autoResetEvent = new AutoResetEvent(false);
}
[Benchmark]
public void BlockingCollectionQueue()
{
DoMultipleJobs(new BlockingCollectionQueue());
}
[Benchmark]
public void DataFlowQueue()
{
DoMultipleJobs(new DataFlowQueue());
}
[Benchmark]
public void ActionBlockQueue()
{
DoMultipleJobs(new ActionBlockQueue());
}
[Benchmark]
public void ChannelQueue()
{
DoMultipleJobs(new ChannelQueue());
}
private void DoMultipleJobs(IProducerConsumer producerConsumerQueue)
{
producerConsumerQueue.StartDequeing();
int jobs = 100000;
for (int i = 0; i < jobs - 1; i++)
{
producerConsumerQueue.Enqueue(() => { });
}
producerConsumerQueue.Enqueue(() => _autoResetEvent.Set());
_autoResetEvent.WaitOne();
producerConsumerQueue.Stop();
}
}
结果
问题与结论
通过这个练习,我发现此时使用
BlockingCollection
可能不是最好的选择。
我不明白为什么
BufferBlock
和ActionBlock
之间有这么大的区别。我已经完成了这两种实现,因为在我的接口中我定义了 StartDequeue()
方法,并且使用 ActionBlock
这是不可能的,因为出队是在 ActionBlock
构造中完成的。
我使用 BufferBlock 的实现是最好的吗?
我想在这里发布我的结果,看看目前哪个是最受接受的生产者消费者队列,以及为什么我看到
ActionBlock
和BufferBlock
之间有如此大的差异
正如您的基准测试所示,
Channel<T>
是一个比 BlockingCollection<T>
性能相对更高的生产者消费者队列。这是合理的,因为 Channel<T>
是较新的组件(2019 年),并且利用了引入 ValueTask<T>
时不存在的 BlockingCollection<T>
技术(2010 年)。为了产生任何可测量的效果,您必须每秒通过队列传递疯狂的许多项目。在这种情况下,考虑以批次/块的方式处理项目可能是个好主意,而不是通过队列单独传递每个项目。
总的来说,我认为当生产者消费者系统同步时,即当生产者和消费者在专用线程上运行时,
BlockingCollection<T>
仍然是一个不错的选择。当您想要构建异步系统时,即您正在调用异步 API 并且您想要有效地利用线程时,Channel<T>
是一个自然的选择。至于 TPL Dataflow 库中的组件,当您想要构建可以在旧版本的 .NET 上运行的异步系统时,它们是一个有效的选择。当两者都可用时,没有什么理由更喜欢旧的 BufferBlock<T>
而不是新的 Channel<T>
。 Channel<T>
具有更清晰、更具表现力的 API,并提供更多选项。就像删除旧物品的能力一样,当添加新物品并且达到最大容量时。
您可能希望避免
Channel<T>
的一种罕见情况是,您的生产者或消费者或两者都在每个异步写入/读取操作中使用取消令牌,而这些操作通常会被取消。此用法可能会在 Channel<T>
中触发内存泄漏,但不会在 BufferBlock<T>
中触发内存泄漏。有关详细信息,请参阅此问题。