我正在用 C++ 构建一个回溯测试系统。我面临的问题是,当我在没有 valgrind 的情况下运行该程序时,或者在使用 valgrind 且主函数中的 while 循环中的
cout << "running\n";
时,程序运行良好。但是当我删除它时,valgrind 似乎冻结在循环中,当我用 Ctrl C 强制退出它时,这就是我得到的:
==112929== Process terminating with default action of signal 2 (SIGINT)
==112929== at 0x10B724: __gthread_mutex_lock(pthread_mutex_t*) (gthr-default.h:748)
==112929== by 0x10B78D: std::mutex::lock() (std_mutex.h:113)
==112929== by 0x10C0FB: std::lock_guard<std::mutex>::lock_guard(std::mutex&) (std_mutex.h:249)
==112929== by 0x10B59F: getNextKLine() (dataparser.cpp:85)
==112929== by 0x10A601: main (main.cpp:25)
我不知道为什么会发生这种情况,并且它已通过 cout 线修复。 这是程序:
主.cpp
#include "utils/dataparser.hpp"
#include <bits/stdc++.h>
#include <cstdio>
#include <ctime>
#include <thread>
#include <vector>
using namespace std;
int main(int argc, char *argv[]) {
cin.tie(NULL);
FILE *klines = fopen("/home/g0dz/schoolsj/prj1/final-prj/formalized-data/"
"BTCUSDT-1s-2024-12-03.csv",
"r");
FILE *aggTrades = fopen("/home/g0dz/schoolsj/prj1/final-prj/formalized-data/"
"BTCUSDT-aggTrades-2024-12-03.csv",
"r");
FILE *trades = fopen("/home/g0dz/schoolsj/prj1/final-prj/formalized-data/"
"BTCUSDT-trades-2024-12-03.csv",
"r");
initParser(klines, aggTrades, trades);
int idx = 0;
while (1) {
cout << "running\n";
auto tmp = getNextKLine();
if (tmp.has_value())
cout << idx++ << ": " << tmp.value().open_time << '\n';
else if (isDone())
break;
}
}
配置解析器.hpp
#include <cstdint>
#include <cstdio>
#include <optional>
#include <vector>
typedef struct {
int64_t id, time;
double price, quantity, quote_quantity;
bool maker, best_match;
} Trade;
typedef struct {
int64_t id, first_id, last_id, timestamp;
double price, quantity;
bool maker, best_match;
std::vector<Trade> trades;
} AggTrade;
typedef struct {
int64_t open_time, close_time;
double open, high, low, close, volumn, quote_vol, taker_base_vol,
taker_quote_vol;
short number_of_trade;
std::vector<AggTrade> agg_trades;
} KLine;
void initParser(FILE *, FILE *, FILE *);
std::optional<KLine> getNextKLine();
std::optional<std::vector<KLine>> getKLineInRange(int64_t, int64_t);
bool isDone();
配置解析器.cpp
#include "dataparser.hpp"
#include <cstdint>
#include <iostream>
#include <mutex>
#include <optional>
#include <thread>
FILE *k, *tr, *at;
std::vector<KLine> data_frames;
std::mutex mtx;
bool done = false;
// super slow, will optimize later
std::vector<Trade> readTradesBetweenTime(int64_t first_id, int64_t last_id) {
static std::vector<Trade> trade_buffer;
std::vector<Trade> ts;
return ts;
for (Trade t : trade_buffer)
if (t.id >= first_id && t.id <= last_id)
ts.push_back(t);
while (!feof(tr)) {
Trade t = Trade();
char maker[10], best_match[10];
fscanf(tr, "%ld,%lf,%lf,%lf,%ld,%s,%s", &t.id, &t.price, &t.quantity,
&t.quote_quantity, &t.time, maker, best_match);
t.maker = maker[0] == 'T', t.best_match = best_match[0] == 'T';
if (t.id >= first_id && t.id <= last_id)
ts.push_back(t);
else {
trade_buffer.push_back(t);
break;
}
}
}
std::vector<AggTrade> readAggTradesBetweenTime(int64_t open_time,
int64_t close_time) {
static std::vector<AggTrade> agg_trades_buffer;
std::vector<AggTrade> agg_trades;
for (AggTrade a : agg_trades_buffer)
if (a.timestamp >= open_time && a.timestamp <= close_time)
agg_trades.push_back(a);
while (!feof(at)) {
AggTrade a = AggTrade();
char maker[10], best_match[10];
fscanf(at, "%ld,%lf,%lf,%ld,%ld,%ld,%s,%s", &a.id, &a.price, &a.quantity,
&a.first_id, &a.last_id, &a.timestamp, maker, best_match);
a.maker = maker[0] == 'T', a.best_match = best_match[0] == 'T';
a.trades = readTradesBetweenTime(a.first_id, a.last_id);
if (a.timestamp >= open_time && a.timestamp <= close_time)
agg_trades.push_back(a);
else {
agg_trades_buffer.push_back(a);
break;
}
}
return agg_trades;
}
void readKLine() {
int cnt = 0;
while (!feof(k)) {
KLine kl = KLine();
int tmp;
fscanf(k, "%ld,%lf,%lf,%lf,%lf,%lf,%ld,%lf,%hd,%lf,%lf,%d", &kl.open_time,
&kl.open, &kl.high, &kl.low, &kl.close, &kl.volumn, &kl.close_time,
&kl.quote_vol, &kl.number_of_trade, &kl.taker_base_vol,
&kl.taker_quote_vol, &tmp);
kl.agg_trades = readAggTradesBetweenTime(kl.open_time, kl.close_time);
std::lock_guard<std::mutex> lock(mtx);
data_frames.push_back(kl);
if (data_frames.size() == 10)
break;
}
done = true;
}
void initParser(FILE *kl, FILE *t, FILE *a) {
k = kl, tr = t, at = a;
std::thread thr(readKLine);
thr.detach();
}
std::optional<KLine> getNextKLine() {
static int index = 0;
std::lock_guard<std::mutex> lock(mtx);
if (index < data_frames.size())
return data_frames[index++];
return std::optional<KLine>();
}
std::optional<std::vector<KLine>> getKLineInRange(int64_t open_time,
int64_t close_time) {
// std::lock_guard<std::mutex> lock(mtx);
// auto start = std::lower_bound(
// data_frames.begin(), data_frames.end(), open_time,
// [open_time](KLine t) { return t.open_time < open_time; });
// auto end = std::upper_bound(
// data_frames.begin(), data_frames.end(), close_time,
// [close_time](KLine t) { return t.close_time < close_time; })
// -
// 1;
// if (start != data_frames.end())
// return std::vector<KLine>(start, end);
return std::optional<std::vector<KLine>>();
}
bool isDone() { return done; }
该程序存在竞争条件,因为
done
不是原子的。 cout << "running\n";
可能会导致同步线程缓存并隐藏问题。该行为是未定义的。使变量std::atomic<bool> done = false;
。