我正在开发一个异步任务处理器。我需要高性能的处理器,所以使用的同步基元应该尽可能的低级。处理器应该拥有一个线程,当没有任务时,这个线程会睡觉,当任务出现时,这个线程会被唤醒。任务处理和任务添加应该在不同的线程中进行。
我尝试用 AutoResetEvent
但它有竞赛条件。
public class Processor
{
ConcurrentQueue<Action> _workItemQueue = new ConcurrentQueue<Action>();
AutoResetEvent _newWorkItemAutoResetEvent = new AutoResetEvent(false);
private bool _disposed;
Thread _thread;
public void Do(Action action)
{
_workItemQueue.Enqueue(action);
_newWorkItemAutoResetEvent.Set();
}
public Processor()
{
_workerThread = new Thread(() =>
{
while (!_disposed)
{
_newWorkItemAutoResetEvent.WaitOne(); //
while (_workItemQueue.TryDequeue(out Action action))
{
action();
}
// at this "bad" moment another thread calls Do method.
// New action has been enqueued, but when we call
// _newWorkIteManualAutoEvent.WaitOne() we fall asleep.
}
});
_thread.Start();
}
}
然后我试着用 ManualResetEvent
:
public class Processor
{
ConcurrentQueue<Action> _workItemQueue = new ConcurrentQueue<Action>();
ManualResetEventSlim _newWorkItemManualResetEvent = new ManualResetEventSlim(false);
private bool _disposed;
Thread _thread;
public void Do(Action action)
{
_workItemQueue.Enqueue(action);
_newWorkItemManualResetEvent.Set();
}
public Processor()
{
_workerThread = new Thread(() =>
{
while (!_disposed)
{
_newWorkItemManualResetEvent.WaitOne();
_newWorkItemManualResetEvent.Reset();
while (_workItemQueue.TryDequeue(out Action action))
{
action();
}
}
});
_thread.Start();
}
}
我没有看到在执行中出现任何竞赛条件。ManualResetEvent
.
问题:我说的对吗?还是我需要另一个同步基元?我想的是Count攲事件(反面) CountdownEvent
). 当其计数大于零时发出信号,当其计数等于零时不发出信号。计数攲事件的计数对应于要执行的任务的计数。
方便的 阻止收集 会为你处理大部分的事情。
就像..:
public sealed class Processor : IDisposable
{
//set a max queue depth to provide back pressure to the request rate
BlockingCollection<Action> _workItemQueue = new BlockingCollection<Action>(32);
private bool _disposed = false;
private Thread _workerThread;
private CancellationTokenSource _cancelTokenSource = new CancellationTokenSource();
public void Do(Action action)
{
_workItemQueue.Add(action);
}
public void Dispose()
{
if (!_disposed)
{
_workItemQueue.CompleteAdding();
_cancelTokenSource.Cancel();
_disposed = true;
}
}
public Processor()
{
_workerThread = new Thread(() =>
{
while (!_workItemQueue.IsCompleted)
{
if (_workItemQueue.TryTake(out Action action, 1000*2,_cancelTokenSource.Token))
{
action();
}
}
});
_workerThread.Start();
}
}