从C ++ 11线程安全地向向量添加元素

问题描述 投票:0回答:1

我的程序需要生成大量的示例字符串,并且由于生成字符串是计算密集型的,我想并行化该过程。我的代码是这样的:

mutex mtx;

void my_thread(vector<string> &V, int length)
{
     string s=generate_some_string(length);  //computationally intensive part

      mtx.lock();
       V.push_back(s);
      mtx.unlock();


}

int main()
{
   vector<string> S;

   while(S.size()<1000)
  {
    vector<thread> ths;
    ths.resize(10);

    for(int i=0; i<10;i++)
    {
       ths[i]=thread(my_thread,ref(S),100 );
    }

    for(auto &th: ths)  th.join();


  }  


}

我运行它时出现“Double free or corruption”错误。

c++ multithreading c++11 vector memory-management
1个回答
0
投票

你的代码

您对线程的使用看起来通常是正确的,因此generate_some_string可能会影响全局状态。您可以通过以下方式解决此问题

  • 使用更好的库。
  • 使用MPI进行并行操作,因为它会产生具有独立存储器的进程。

并行哲学

回想起来,上述情况似乎很明显,因此有一个问题是为什么它不会立即显现出来。我认为这与你实现并行性的方式有关。

C ++ 11线程为您提供了很大的灵活性,但它还要求您明确地构建并行性。大多数时候这不是你想要的。为编译器提供有关如何并行化代码并让它处理低级细节的信息,这样更容易,也更少有错误。

下面介绍如何使用OpenMP执行此操作:所有现代编译器中包含的行业标准编译器指令集,广泛用于高性能计算。

您会注意到代码通常比您编写的代码更容易阅读,因此更容易调试。

下面的所有代码都将使用命令进行编译(适合您的编译器进行修改:

g++ -O3 main.cpp -fopenmp

解决方案0:使用更简单的并行方式

首先,我建议使用OpenMP来实现并行性。它是一个行业标准,消除了必须处理线程的许多痛苦,并允许您在概念级别表达并行性。

解决方案1:私人记忆

您可以通过让每个线程写入其自己的私有内存然后将私有内存合并在一起来解决您的问题。这完全避免了互斥锁,这可能会导致代码更快,并可能避免您遇到的问题。

请注意,每个线程都会产生多个计算密集型字符串,但这项工作会在可用线程之间自动划分。它是

#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>

const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH  = 50;

using namespace std::chrono_literals;

//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
  double sum=0;
  for(int i=0;i<length;i++){
    std::this_thread::sleep_for(2ms);
    sum+=std::sqrt(i);
  }
  return std::to_string(sum);
}

int main(){
  //Build a vector that contains vectors of strings. Each thread will have its
  //own vector of strings
  std::vector< std::vector<std::string> > vecs(omp_get_max_threads());

  //Loop over lengths
  for(int length=10;length<MAX_STRING_LENGTH;length++){
    //Progress so the user does not get impatient
    std::cout<<length<<std::endl;
    //Parallelize across all cores
    #pragma omp parallel for
    for(int i=0;i<STRINGS_PER_LENGTH;i++){
      //Each thread independently generates its string and puts it into its own
      //private memory space
      vecs[omp_get_thread_num()].push_back(GenerateSomeString(length));
    }
  }

  //Merge all the threads' results together
  std::vector<std::string> S;
  for(auto &v: vecs)
    S.insert(S.end(),v.begin(),v.end());

  //Throw away the thread private memory
  vecs.clear();
  vecs.shrink_to_fit();
}

解决方案2:使用减少量

我们可以定义一个自定义缩减运算符来合并向量。在代码的并行部分中使用此运算符允许我们消除向量的向量和之后的清理。相反,当线程完成其工作时,OpenMP会安全地处理它们的结果。

#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>

using namespace std::chrono_literals;

const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH  = 50;    

//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
  double sum=0;
  for(int i=0;i<length;i++){
    std::this_thread::sleep_for(2ms);
    sum+=std::sqrt(i);
  }
  return std::to_string(sum);
}

int main(){
  //Global vector, must not be accessed by individual threads
  std::vector<std::string> S;

  #pragma omp declare reduction (merge : std::vector<std::string> : omp_out.insert(omp_out.end(), omp_in.begin(), omp_in.end()))

  //Loop over lengths
  for(int length=10;length<50;length++){
    //Progress so the user does not get impatient
    std::cout<<length<<std::endl;
    //Parallelize across all cores
    std::vector<std::string> private_memory;
    #pragma omp parallel for reduction(merge: private_memory)
    for(int i=0;i<STRINGS_PER_LENGTH;i++){
      //Each thread independently generates its string and puts it into its own
      //private memory space
      private_memory.push_back(GenerateSomeString(length));
    }
  }
}

解决方案3:使用critical

我们可以通过将push_back放入一个关键部分来完全消除这种减少,这一部分将访问该部分代码限制为一次一个线程。

//Compile with g++ -O3 main.cpp -fopenmp
#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>

using namespace std::chrono_literals;

const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH  = 50;    

//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
  double sum=0;
  for(int i=0;i<length;i++){
    std::this_thread::sleep_for(2ms);
    sum+=std::sqrt(i);
  }
  return std::to_string(sum);
}

int main(){
  //Global vector, must not be accessed by individual threads
  std::vector<std::string> S;

  //Loop over lengths
  for(int length=10;length<50;length++){
    //Progress so the user does not get impatient
    std::cout<<length<<std::endl;
    //Parallelize across all cores
    #pragma omp parallel for
    for(int i=0;i<STRINGS_PER_LENGTH;i++){
      //Each thread independently generates its string and puts it into its own
      //private memory space
      const auto temp = GenerateSomeString(length);
      //Only one thread can access this part of the code at a time
      #pragma omp critical
      S.push_back(temp);
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.