我有一个关于线程池的设计问题。考虑以下代码:
int main() {
auto po = std::execution::par_unseq;
// If Parallel STL uses TBB as the backend, I think the thread pool
// is created during the first time for_each() being called.
std::for_each(po, begin, end, unaryFun);
// Worker threads are put to sleep.
// ... Other things are done by the main thread.
std::for_each(po, begin, end, unaryFun); // Threads wake up and run again.
// ... Other things are done by the main thread.
} // Right before going out of scope, how does the thread pool know to destruct
// itself ?
TBB曾经出现过内存泄漏问题C++:tbb中的内存泄漏。每当我用消毒剂编译程序时,我都必须设置
export ASAN_OPTIONS=alloc_dealloc_mismatch=0
以避免崩溃。我一直认为泄漏问题正是由于线程池没有被删除而超出了范围。
但是,新版本
oneTBB
不再有这个问题。他们是怎么解决的?我不认为答案是愚蠢的,因为线程池是在每个 for_each()
调用中构造和销毁的。线程池如何知道超出范围会破坏自己?我想将这样的设计应用到其他一些结构上。
非常感谢!
不需要在作用域结束时销毁线程池,可以在任何操作系统上卸载库时执行代码,对于C++,编译器应该在卸载库时调用所有静态对象的析构函数,你只需要把线程池设置为单例即可。
这是此类设计工作的简单实现godbolt demo
#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <optional>
#include <mutex>
#include <condition_variable>
#include <iostream>
class TaskQueue
{
public:
void push(std::optional<std::packaged_task<void()>> task)
{
{
std::unique_lock lk{m_mutex};
m_queue.push(std::move(task));
}
m_cv.notify_one();
}
std::optional<std::packaged_task<void()>> pop()
{
std::optional<std::packaged_task<void()>> task;
{
std::unique_lock lk(m_mutex);
m_cv.wait(lk, [this](){ return !this->m_queue.empty();});
task = std::move(m_queue.front());
m_queue.pop();
}
return task;
}
private:
std::queue<std::optional<std::packaged_task<void()>>> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
};
class ThreadPool
{
public:
static ThreadPool& Instance()
{
static ThreadPool pool(std::thread::hardware_concurrency());
return pool;
}
template<typename Func>
std::future<void> push_task(Func&& f)
{
std::packaged_task<void()> task{
[func = std::move(f)] { func(); }
};
auto fut = task.get_future();
m_queue.push(std::move(task));
return fut;
}
void init()
{
std::call_once(m_init_flag, [this](){this->Initialize();});
}
private:
ThreadPool(int thread_count)
: m_thread_count{thread_count}
{
}
void worker_task()
{
while (m_running)
{
auto task = m_queue.pop();
if (task)
{
task.value()();
}
else
{
break;
}
}
}
void Initialize()
{
m_running = true;
for (int i = 0; i < m_thread_count; i++)
{
m_workers.push_back(std::thread{[this]{this->worker_task();}});
}
}
~ThreadPool()
{
m_running = false;
for (int i = 0; i < m_thread_count; i++)
{
m_queue.push(std::nullopt);
}
for (auto&& worker: m_workers)
{
if (worker.joinable())
{
worker.join();
}
}
}
std::once_flag m_init_flag;
int m_thread_count;
TaskQueue m_queue;
std::vector<std::thread> m_workers;
bool m_running = false;
};
template<typename BiIter, typename Func>
void foreach(BiIter begin, BiIter end, Func&& func)
{
std::vector<std::future<void>> futures;
futures.reserve(std::distance(begin,end));
auto&& threadpool = ThreadPool::Instance();
threadpool.init();
while (begin != end)
{
futures.push_back(threadpool.push_task([begin = begin, &func]{ func(*begin);}));
begin++;
}
for (auto&& future: futures)
{
future.get();
}
}
int main()
{
std::vector<int> vals{1,2,3,4,5};
foreach(vals.begin(), vals.end(), [](int& value) { value *= 2; });
for (auto&& value: vals)
{
std::cout << value << ' ';
}
}
通常库会进行额外的优化,例如通过使用无锁结构和分块任务来减少堆分配和减少锁,因此此实现并未经过优化,但它是线程安全的并且不会泄漏。
如果您正在设计自己的线程池,我建议您不要将其设置为单例,而是将其传递或使用资源定位器,然后在您的
main
中创建它,以解决静态构造/销毁的不可控顺序对象,如果有人在创建或销毁另一个静态对象期间尝试使用线程池,则上述代码可能会中断。