线程池设计

问题描述 投票:0回答:1

我有一个关于线程池的设计问题。考虑以下代码:

int main() {
  auto po = std::execution::par_unseq;
  
  // If Parallel STL uses TBB as the backend, I think the thread pool
  //   is created during the first time for_each() being called.
  std::for_each(po, begin, end, unaryFun);  
  // Worker threads are put to sleep.
  
  // ... Other things are done by the main thread.
  
  std::for_each(po, begin, end, unaryFun); // Threads wake up and run again.
  
  // ... Other things are done by the main thread.
  
} // Right before going out of scope, how does the thread pool know to destruct
//     itself ? 

TBB曾经出现过内存泄漏问题C++:tbb中的内存泄漏。每当我用消毒剂编译程序时,我都必须设置

export ASAN_OPTIONS=alloc_dealloc_mismatch=0
以避免崩溃。我一直认为泄漏问题正是由于线程池没有被删除而超出了范围。

但是,新版本

oneTBB
不再有这个问题。他们是怎么解决的?我不认为答案是愚蠢的,因为线程池是在每个
for_each()
调用中构造和销毁的。线程池如何知道超出范围会破坏自己?我想将这样的设计应用到其他一些结构上。

非常感谢!

c++ algorithm design-patterns global-variables tbb
1个回答
0
投票

不需要在作用域结束时销毁线程池,可以在任何操作系统上卸载库时执行代码,对于C++,编译器应该在卸载库时调用所有静态对象的析构函数,你只需要把线程池设置为单例即可。

这是此类设计工作的简单实现godbolt demo

#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <optional>
#include <mutex>
#include <condition_variable>
#include <iostream>


class TaskQueue
{
    public:
    void push(std::optional<std::packaged_task<void()>> task)
    {
        {
            std::unique_lock lk{m_mutex};
            m_queue.push(std::move(task));
        }
        m_cv.notify_one();
    }
    std::optional<std::packaged_task<void()>> pop()
    {
        std::optional<std::packaged_task<void()>> task;
        {
            std::unique_lock lk(m_mutex);
            m_cv.wait(lk, [this](){ return !this->m_queue.empty();});
            task = std::move(m_queue.front());
            m_queue.pop();
        }
        return task;
    }
    private:
    std::queue<std::optional<std::packaged_task<void()>>> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_cv;

};

class ThreadPool
{
    public:
    static ThreadPool& Instance()
    {
        static ThreadPool pool(std::thread::hardware_concurrency());
        return pool;
    }
    
    template<typename Func>
    std::future<void> push_task(Func&& f)
    {
        std::packaged_task<void()> task{
            [func = std::move(f)] { func(); }
        };
        auto fut = task.get_future();
        m_queue.push(std::move(task));
        return fut;
    }
    void init()
    {
        std::call_once(m_init_flag, [this](){this->Initialize();});
    }

    private:
    ThreadPool(int thread_count)
    : m_thread_count{thread_count}
    {

    }
    void worker_task()
    {
        while (m_running)
        {
            auto task = m_queue.pop();
            if (task)
            {
                task.value()();
            }
            else
            {
                break;
            }
        }
    }
    void Initialize()
    {
        m_running = true;
        for (int i = 0; i < m_thread_count; i++)
        {
            m_workers.push_back(std::thread{[this]{this->worker_task();}});
        }
    }

    ~ThreadPool()
    {
        m_running = false;
        for (int i = 0; i < m_thread_count; i++)
        {
            m_queue.push(std::nullopt);
        }
        for (auto&& worker: m_workers)
        {
            if (worker.joinable())
            {
                worker.join();
            }
        }
    }
    std::once_flag m_init_flag;
    int m_thread_count;
    TaskQueue m_queue;
    std::vector<std::thread> m_workers;
    bool m_running = false;
};

template<typename BiIter, typename Func>
void foreach(BiIter begin, BiIter end, Func&& func)
{
    std::vector<std::future<void>> futures;
    futures.reserve(std::distance(begin,end));
    auto&& threadpool = ThreadPool::Instance();
    threadpool.init();
    while (begin != end)
    {
        futures.push_back(threadpool.push_task([begin = begin, &func]{ func(*begin);}));
        begin++;
    }
    for (auto&& future: futures)
    {
        future.get();
    }
}

int main()
{
    std::vector<int> vals{1,2,3,4,5};
    foreach(vals.begin(), vals.end(), [](int& value) { value *= 2; });

    for (auto&& value: vals)
    {
        std::cout << value << ' ';
    }
}

通常库会进行额外的优化,例如通过使用无锁结构和分块任务来减少堆分配和减少锁,因此此实现并未经过优化,但它是线程安全的并且不会泄漏。

如果您正在设计自己的线程池,我建议您不要将其设置为单例,而是将其传递或使用资源定位器,然后在您的

main
中创建它,以解决静态构造/销毁的不可控顺序对象,如果有人在创建或销毁另一个静态对象期间尝试使用线程池,则上述代码可能会中断。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.