生产者消费者队列最佳实践和性能

问题描述 投票:0回答:1

我正在用 C# 构建一个生产者消费者队列,并且正在阅读以寻找稳健性和性能方面的最佳方法。 多年来我一直使用

BlockingCollection
,但我发现了
TPLDataflow
Channels

我一直在做一些基准测试,我发现 TPL 和通道在使元素出队方面要快得多。

我的要求是:

  • 队列行为(维持项目排序)
  • 多个线程可以将元素入队
  • 一个线程读取元素(以维持顺序)

IProducerConsumer
界面:

public interface IProducerConsumer
{
    void Enqueue(Action item);
    void Stop();
    void StartDequeing();
}

阻塞收集实现:

public class BlockingCollectionQueue : IProducerConsumer
{
    private readonly BlockingCollection<Action> _defaultQueue;
    private Task _dequeTask;

    public BlockingCollectionQueue()
    {
        _defaultQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
    }

    public void Enqueue(Action item)
    {
        if (!_defaultQueue.IsAddingCompleted)
        {
            _defaultQueue.Add(item);
        }
    }

    public void Stop()
    {
        _defaultQueue.CompleteAdding();
    }

    public void StartDequeing()
    {
        Task.Run(DequeueTask);
    }

    private void DequeueTask()
    {
        foreach (var item in _defaultQueue.GetConsumingEnumerable())
        {
            item?.Invoke();
        }
    }
}

渠道实施:

public class ChannelQueue : IProducerConsumer
{
    private readonly Channel<Action> _channel;
    private readonly ChannelWriter<Action> _channelWriter;
    private readonly ChannelReader<Action> _channelReader;

    public ChannelQueue()
    {
        _channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
        _channelWriter = _channel.Writer;
        _channelReader = _channel.Reader;
    }

    public void Enqueue(Action item)
    {
        _channelWriter.TryWrite(item);
    }

    public void StartDequeing()
    {
        Task.Run(DequeueTask);
    }

    private async Task DequeueTask()
    {
        while (await _channelReader.WaitToReadAsync())
        {
            while (_channelReader.TryRead(out var job))
            {
                job?.Invoke();
            }
        }
    }

    public void Stop()
    {
        _channelWriter.Complete();
    }
}

TPLDataFlow 使用

BufferBlock
实现:

public class DataFlowQueue : IProducerConsumer
{
    private readonly BufferBlock<Action> _bufferBlock;
    private Task _dequeTask;

    public DataFlowQueue()
    {
        var dataflowOptions = new DataflowBlockOptions() { EnsureOrdered = true };
        _bufferBlock = new BufferBlock<Action>(dataflowOptions);
    }

    public void Enqueue(Action item)
    {
        _bufferBlock.Post(item);
    }

    public void StartDequeing()
    {
        _dequeTask = Task.Run(DequeueTask);
    }

    private async Task DequeueTask()
    {
        while (await _bufferBlock.OutputAvailableAsync())
        {
            while(_bufferBlock.TryReceive(out var item))
            {
                item?.Invoke();
            }
        }
    }

    public void Stop()
    {
        _bufferBlock.Complete();
    }
}

TPLDataFlow 使用

ActionBlock
:

public class ActionBlockQueue : IProducerConsumer
{
    private readonly ActionBlock<Action> _actionBlock;
    private Task _dequeTask;
    
    public ActionBlockQueue()
    {
        var dataflowOptions = new ExecutionDataflowBlockOptions() { EnsureOrdered = true, MaxDegreeOfParallelism = 1 };
        _actionBlock = new ActionBlock<Action>(item=> item?.Invoke(), dataflowOptions);
    }

    public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
    {
        _actionBlock.Post(item);
    }

    public void StartDequeing()
    {
    }

    public void Stop()
    {
        _actionBlock.Complete();
    }
}

使用 BenchmarDotNet 进行基准测试

正如您所看到的,所有实现都将

Action
存储在队列中,并且我使用
AutoResetEvent
在最后一个元素出队时发出信号。

public class MultipleJobBenchMark
{
    private AutoResetEvent _autoResetEvent;

    public MultipleJobBenchMark()
    {
        _autoResetEvent = new AutoResetEvent(false);
    }

    [Benchmark]
    public void BlockingCollectionQueue()
    {
        DoMultipleJobs(new BlockingCollectionQueue());
    }

    [Benchmark]
    public void DataFlowQueue()
    {
        DoMultipleJobs(new DataFlowQueue());
    }

    [Benchmark]
    public void ActionBlockQueue()
    {
        DoMultipleJobs(new ActionBlockQueue());
    }

    [Benchmark]
    public void ChannelQueue()
    {
        DoMultipleJobs(new ChannelQueue());
    }

    private void DoMultipleJobs(IProducerConsumer producerConsumerQueue)
    {
        producerConsumerQueue.StartDequeing();
        int jobs = 100000;

        for (int i = 0; i < jobs - 1; i++)
        {
            producerConsumerQueue.Enqueue(() => { });
        }

        producerConsumerQueue.Enqueue(() => _autoResetEvent.Set());
        _autoResetEvent.WaitOne();
        producerConsumerQueue.Stop();
    }
}

结果

  • BlockingCollection:平均 21.5 毫秒
  • BufferBlock 队列:平均 14.937ms
  • 动作块队列:6.007ms
  • 通道:4.781ms

问题与结论

通过这个练习,我发现此时使用

BlockingCollection
可能不是最好的选择。

我不明白为什么

BufferBlock
ActionBlock
之间有这么大的区别。我已经完成了这两种实现,因为在我的接口中我定义了
StartDequeue()
方法,并且使用
ActionBlock
这是不可能的,因为出队是在
ActionBlock
构造中完成的。

我使用 BufferBlock 的实现是最好的吗?

我想在这里发布我的结果,看看目前哪个是最受接受的生产者消费者队列,以及为什么我看到

ActionBlock
BufferBlock

之间有如此大的差异
c# performance task-parallel-library producer-consumer blockingcollection
1个回答
3
投票

正如您的基准测试所示,

Channel<T>
是一个比
BlockingCollection<T>
性能相对更高的生产者消费者队列。这是合理的,因为
Channel<T>
是较新的组件(2019 年),并且利用了引入
ValueTask<T>
时不存在的
BlockingCollection<T>
技术(2010 年)。为了产生任何可测量的效果,您必须每秒通过队列传递疯狂的许多项目。在这种情况下,考虑以批次/的方式处理项目可能是个好主意,而不是通过队列单独传递每个项目。

总的来说,我认为当生产者消费者系统同步时,即当生产者和消费者在专用线程上运行时,

BlockingCollection<T>
仍然是一个不错的选择。当您想要构建异步系统时,即您正在调用异步 API 并且您想要有效地利用线程时,
Channel<T>
是一个自然的选择。至于 TPL Dataflow 库中的组件,当您想要构建可以在旧版本的 .NET 上运行的异步系统时,它们是一个有效的选择。当两者都可用时,没有什么理由更喜欢旧的
BufferBlock<T>
而不是新的
Channel<T>
Channel<T>
具有更清晰、更具表现力的 API,并提供更多选项。就像删除旧物品的能力一样,当添加新物品并且达到最大容量时。

您可能希望避免

Channel<T>
的一种罕见情况是,您的生产者或消费者或两者都在每个异步写入/读取操作中使用取消令牌,而这些操作通常会被取消。此用法可能会在
Channel<T>
中触发内存泄漏,但不会在
BufferBlock<T>
中触发内存泄漏。有关详细信息,请参阅此问题

© www.soinside.com 2019 - 2024. All rights reserved.