我正在为回测器设计一个快速的 csv 阅读器。为了读取 csv 文件,我首先将其读入字符串。此后,我创建多个线程来解析数据并将其存储到我正在使用的表结构中。
每个线程接收开始和结束索引,并对字符串的该部分进行操作。 当使用 1 个线程时,我得到的运行时间为 6.5 秒。此后,当我使用 2 或 4 个线程时,运行时间几乎呈线性增加。我的系统上有 8 个核心。
我也附加了我的工作人员功能。 “lines”属性是一个类属性,它是一个字符串向量,每个字符串都是 csv 的一行。
void read_csv_worker(pair<int,int>t){
auto [start,end] = t;
vector<pair<string,double>>vec(num_cols - 1); // timestamp also a column but stored explicitly
string timestamp, curr;
int curr_index;
int curr_col;
stringstream line_stream;
for(int i=start;i<end;i++){
timestamp.clear();
line_stream.str(lines[i]);
line_stream.clear();
curr_index = 0;
curr_col = 0;
while (getline(line_stream, curr, ',')) {
if (curr_index == timestamp_col_num)
timestamp = curr.substr(1, curr.size()-2);
else
vec[curr_col++] = {headers[curr_index], stod(curr)};
++curr_index;
}
this->insert(vec, timestamp, i);
}
}
我在另一个类函数中创建线程的地方:
int curr_pos = 0;
int chunk_size = num_lines/num_threads;
for(int i=0;i<num_threads-1;i++){
pair<int,int>t = {curr_pos, curr_pos + chunk_size};
threads[i] = std::thread([this, t]() {
read_csv_worker(t); // Now 'this' is available to the member function
});
curr_pos += chunk_size;
}
pair<int,int>t = {curr_pos, num_lines};
threads[num_threads-1] = std::thread([this, t]() {
read_csv_worker(t);
});
for(int i=0;i<num_threads;i++) threads[i].join();
任何帮助或建议将不胜感激。谢谢!
尝试对数据进行切片并将其作为参数传递给线程,因为我认为可能存在对lines类属性的争用。没有帮助。
你没有展示
insert()
的身体,但我敢打赌this->insert(vec, timestamp, i);
电话就是罪魁祸首。您必须在函数内使用某种线程同步,可能获取互斥锁。当您对每个 CSV 行执行此操作时,这是非常昂贵的。
在改进并行化方案之前,您应该找到一种更有效的方法来标记字符串(即用逗号分隔)。使用
stingstream
是一种方便但非常慢的方法。