有没有像异步BlockingCollection ?

问题描述 投票:63回答:3

我想await异步地对BlockingCollection<T>.Take()的结果,所以我不阻止线程。寻找这样的事情:

var item = await blockingCollection.TakeAsync();

我知道我可以这样做:

var item = await Task.Run(() => blockingCollection.Take());

但是有点杀死了整个想法,因为另一个(ThreadPool)线程被阻止了。

还有其他选择吗?

c# .net collections task-parallel-library async-await
3个回答
64
投票

我知道有四种选择。

第一个是Channels,它提供了一个支持异步ReadWrite操作的线程安全队列。通道经过高度优化,并且可选择在达到阈值时支持丢弃某些项目。

接下来是来自BufferBlock<T>TPL Dataflow。如果您只有一个消费者,您可以使用OutputAvailableAsyncReceiveAsync,或者只是将其链接到ActionBlock<T>。有关更多信息,请访问see my blog

最后两个是我创建的类型,可以在我的AsyncEx library中找到。

AsyncCollection<T>async,相当于BlockingCollection<T>,能够包装同时生产者/消费者的集合,如ConcurrentQueue<T>ConcurrentBag<T>。您可以使用TakeAsync异步使用集合中的项目。有关更多信息,请访问see my blog

AsyncProducerConsumerQueue<T>是一个更便携的async兼容的生产者/消费者队列。您可以使用DequeueAsync异步使用队列中的项目。有关更多信息,请访问see my blog

这些替代方案的最后三个允许同步和异步put和take。


13
投票

......或者你可以这样做:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

简单,功能齐全的异步FIFO队列。

注意:之前在.NET 4.5中添加了SemaphoreSlim.WaitAsync,这并不是那么简单。


1
投票

这是BlockingCollection的一个非常基本的实现,支持等待,具有许多缺少的功能。它使用着名的AsyncEnumerable类,它将在C#8(引入异步流)发布后变得过时,但仅适用于.NET Core 3.0。 .NET Framework不会进行此升级,因此AsyncEnumerable将保持部分有用。

public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
    private Queue<T> _queue = new Queue<T>();
    private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
    private int _consumersCount = 0;
    private bool _isAddingCompleted;

    public void Add(T item)
    {
        lock (_queue)
        {
            if (_isAddingCompleted) throw new InvalidOperationException();
            _queue.Enqueue(item);
        }
        _semaphore.Release();
    }

    public void CompleteAdding()
    {
        lock (_queue)
        {
            if (_isAddingCompleted) return;
            _isAddingCompleted = true;
            _semaphore.Release(_consumersCount);
        }
    }

    public IAsyncEnumerable<T> GetConsumingEnumerable()
    {
        lock (_queue) _consumersCount++;
        return new AsyncEnumerable<T>(async yield =>
        {
            while (true)
            {
                lock (_queue)
                {
                    if (_queue.Count == 0 && _isAddingCompleted) break;
                }
                await _semaphore.WaitAsync();
                bool hasItem;
                T item = default;
                lock (_queue)
                {
                    hasItem = _queue.Count > 0;
                    if (hasItem) item = _queue.Dequeue();
                }
                if (hasItem) await yield.ReturnAsync(item);
            }
        });
    }
}

用法示例:

var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(100);
        abc.Add(i);
    }
    abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
    await abc.GetConsumingEnumerable().ForEachAsync(async item =>
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    });
});
await Task.WhenAll(producer, consumer);

输出:

1 2 3 4 5 6 7 8 9 10

© www.soinside.com 2019 - 2024. All rights reserved.