C++ Concurrency In Action 中的并行快速排序示例中的潜在错误

问题描述 投票:0回答:1

我一直在阅读 Anthony Williams 所著的《C++ Concurrency In Action》(第二版)。在第 8 章第 855 页的代码清单 8.1 中,有一个并发实现快速排序的示例代码。我想知道这段代码是否包含错误,特别是在

threads
类的
sorter
成员中,它是一个普通的非线程安全向量。我的理解是
do_sort
函数将新项目推送到
sorter::threads
并生成一个递归调用
do_sort
的新线程。那么,我们是否不会在没有任何数据竞争保护的情况下从多个并发线程中推动
sorter::threads

我检查了本书网页中的勘误部分,没有找到提到这个问题的内容。我也尝试联系作者但没有收到回复。

为方便起见,将有问题的代码复制如下:

template<typename T>
struct sorter
{
    struct chunk_to_sort
    {
        std::list<T> data;
        std::promise<std::list<T> > promise;
    };

    thread_safe_stack<chunk_to_sort> chunks; 
    std::vector<std::thread> threads;
    unsigned const max_thread_count;
    std::atomic<bool> end_of_data;

    sorter():
        max_thread_count(std::thread::hardware_concurrency()-1),
        end_of_data(false)
    {}

    ~sorter()
    {
        end_of_data=true;
        for(unsigned i=0;i<threads.size();++i)
        {
            threads[i].join();
        }
    }

    void try_sort_chunk()
    {
        boost::shared_ptr<chunk_to_sort > chunk=chunks.pop();
        if(chunk)
        {
        sort_chunk(chunk);
        }
    }

    std::list<T> do_sort(std::list<T>& chunk_data)
    {
        if(chunk_data.empty())
        {
            return chunk_data;
        }
        std::list<T> result;
        result.splice(result.begin(),chunk_data,chunk_data.begin());
        T const& partition_val=*result.begin();

        typename std::list<T>::iterator divide_point=
            std::partition(chunk_data.begin(),chunk_data.end(),
                           [&](T const& val){return val<partition_val;});

        chunk_to_sort new_lower_chunk;
        new_lower_chunk.data.splice(new_lower_chunk.data.end(),
                                    chunk_data,chunk_data.begin(),
                                    divide_point);

        std::future<std::list<T> > new_lower=
            new_lower_chunk.promise.get_future();
        chunks.push(std::move(new_lower_chunk));
        if(threads.size()<max_thread_count)
        {
            threads.push_back(std::thread(&sorter<T>::sort_thread,this));
        }

        std::list<T> new_higher(do_sort(chunk_data));

        result.splice(result.end(),new_higher);
        while(new_lower.wait_for(std::chrono::seconds(0)) !=
              std::future_status::ready)
        {
            try_sort_chunk();
        }

        result.splice(result.begin(),new_lower.get());
        return result;
    }

    void sort_chunk(boost::shared_ptr<chunk_to_sort > const& chunk)
    {
        chunk->promise.set_value(do_sort(chunk->data));
    }

    void sort_thread()
    {
        while(!end_of_data)
        {
            try_sort_chunk();
            std::this_thread::yield();
        }
    }
};

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if(input.empty())
    {
        return input;
    }
    sorter<T> s;
    return s.do_sort(input);
}
c++ multithreading concurrency thread-safety quicksort
1个回答
0
投票

我已经运行了相同的代码进行一些实验。您很可能会出现意想不到的行为。

© www.soinside.com 2019 - 2024. All rights reserved.