我正在研究一个线程池,并遇到一个关于条件变量和互斥量的奇怪问题。我怀疑可能存在锁定问题,因为它有时会起作用,有时则不然。这是代码的相关部分(删除了非相关位):
class ThreadPool {
private:
std::atomic<bool> running;
std::atomic<size_t> unfinished_tasks;
std::queue<std::function<void(void)>> task_queue;
std::condition_variable cv_work;
std::mutex mtx_queue;
std::vector<std::thread> threads;
public:
ThreadPool(size_t num_threads = std::thread::hardware_concurrency());
~ThreadPool();
template<class T, class Fn>
std::future<T> queueTask(Fn&& fn);
};
ThreadPool::ThreadPool(size_t num_threads) :
running(true), unfinished_tasks(0) {
auto thread_loop = [&] {
while (running.load()) {
std::unique_lock<std::mutex> lock(mtx_queue);
if (!task_queue.empty()) {
auto work = task_queue.front();
task_queue.pop();
lock.unlock();
work();
unfinished_tasks--;
} else {
std::cout << std::this_thread::get_id() << " going to sleep..." << std::endl;
cv_work.wait(lock);
}
}};
threads.reserve(num_threads);
for (size_t i = 0; i < num_threads; i++) {
threads.push_back(std::thread(thread_loop));
}
}
template<class T, class Fn>
inline std::future<T> ThreadPool::queueTask(Fn&& fn) {
// func = lambda containing packaged task with fn
mtx_queue.lock();
task_queue.push(func);
mtx_queue.unlock();
unfinished_tasks++;
cv_work.notify_one();
return future;
}
一旦我注释掉包含调试输出的行,向线程池添加许多小任务就会使它在某个时刻锁定,调试输出到位后,它将正确完成所有任务。我不确定这里的问题在哪里。
你有竞争条件。 queueTask
可以在你的线程函数等待之前通知cv_work
。在你打电话给mtx_queue
之前不要解锁cv_work.notify_one()
。