我想await
异步地对BlockingCollection<T>.Take()
的结果,所以我不阻止线程。寻找这样的事情:
var item = await blockingCollection.TakeAsync();
我知道我可以这样做:
var item = await Task.Run(() => blockingCollection.Take());
但是有点杀死了整个想法,因为另一个(ThreadPool
)线程被阻止了。
还有其他选择吗?
我知道有四种选择。
第一个是Channels,它提供了一个支持异步Read
和Write
操作的线程安全队列。通道经过高度优化,并且可选择在达到阈值时支持丢弃某些项目。
接下来是来自BufferBlock<T>
的TPL Dataflow。如果您只有一个消费者,您可以使用OutputAvailableAsync
或ReceiveAsync
,或者只是将其链接到ActionBlock<T>
。有关更多信息,请访问see my blog。
最后两个是我创建的类型,可以在我的AsyncEx library中找到。
AsyncCollection<T>
是async
,相当于BlockingCollection<T>
,能够包装同时生产者/消费者的集合,如ConcurrentQueue<T>
或ConcurrentBag<T>
。您可以使用TakeAsync
异步使用集合中的项目。有关更多信息,请访问see my blog。
AsyncProducerConsumerQueue<T>
是一个更便携的async
兼容的生产者/消费者队列。您可以使用DequeueAsync
异步使用队列中的项目。有关更多信息,请访问see my blog。
这些替代方案的最后三个允许同步和异步put和take。
......或者你可以这样做:
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class AsyncQueue<T>
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentQueue<T> _que;
public AsyncQueue()
{
_sem = new SemaphoreSlim(0);
_que = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
_que.Enqueue(item);
_sem.Release();
}
public void EnqueueRange(IEnumerable<T> source)
{
var n = 0;
foreach (var item in source)
{
_que.Enqueue(item);
n++;
}
_sem.Release(n);
}
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
for (; ; )
{
await _sem.WaitAsync(cancellationToken);
T item;
if (_que.TryDequeue(out item))
{
return item;
}
}
}
}
简单,功能齐全的异步FIFO队列。
注意:之前在.NET 4.5中添加了
SemaphoreSlim.WaitAsync
,这并不是那么简单。
这是BlockingCollection
的一个非常基本的实现,支持等待,具有许多缺少的功能。它使用着名的AsyncEnumerable
类,它将在C#8(引入异步流)发布后变得过时,但仅适用于.NET Core 3.0。 .NET Framework不会进行此升级,因此AsyncEnumerable
将保持部分有用。
public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
private Queue<T> _queue = new Queue<T>();
private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private int _consumersCount = 0;
private bool _isAddingCompleted;
public void Add(T item)
{
lock (_queue)
{
if (_isAddingCompleted) throw new InvalidOperationException();
_queue.Enqueue(item);
}
_semaphore.Release();
}
public void CompleteAdding()
{
lock (_queue)
{
if (_isAddingCompleted) return;
_isAddingCompleted = true;
_semaphore.Release(_consumersCount);
}
}
public IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
return new AsyncEnumerable<T>(async yield =>
{
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) await yield.ReturnAsync(item);
}
});
}
}
用法示例:
var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(100);
abc.Add(i);
}
abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
await abc.GetConsumingEnumerable().ForEachAsync(async item =>
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
});
});
await Task.WhenAll(producer, consumer);
输出:
1 2 3 4 5 6 7 8 9 10