作为 binance-websocket api 和 std::future 使用的新手。我写了一个程序来测试本地主机和币安服务器之间的时间差,并获取网络延迟。
我将代码推送到我的 github 存储库ws_binance_time_diff_delay
我有 2 个问题。
无法完全运行。
显示:
{"id":1,"status":200,"result":{"serverTime":1718697917878},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"M
INUTE","intervalNum":1,"limit":6000,"count":3}]}
type of future_example is class std::future<class std::vector<__int64,class std::allocator<__int64> > >
test group num 10; remain num 90
1718697921760; 1718697921956; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
然后它既没有响应也没有终止。
在文件
ws_diff_delay.cpp
中,第84行和第85行。
如果我使用前者,它显示:
{"id":1,"status":200,"result":{"serverTime":1718698030116},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"M
INUTE","intervalNum":1,"limit":6000,"count":3}]}
type of future_example is class std::future<class std::vector<__int64,class std::allocator<__int64> > >
test group num 10; remain num 90
1718698034095; 1718698034251; 1718698030291;
1718698034095; 1718698034314; 1718698030342;
1718698034095; 1718698034314; 1718698030342;
Process finished with exit code -1073740791 (0xC0000409)
它自行终止并显示 stackoverflow 消息“进程已完成,退出代码 -1073740791 (0xC0000409)”。
我调试并追踪到了第97行。
它停在汇编文件debug_assamble第24行
subl $0xd8, %esp
movq 0x2e3942(%rip), %rax
xorq %rsp, %rax
movq %rax, 0xc0(%rsp)
andq $0x0, 0x28(%rsp)
leaq -0x26(%rip), %rax ; RaiseException
andl $0x1, %edx
movl %ecx, 0x20(%rsp)
movl %edx, 0x24(%rsp)
movq %rax, 0x30(%rsp)
testq %r9, %r9
je 0x18004468a
movl $0xf, %eax
leaq 0x40(%rsp), %rcx
cmpl %eax, %r8d
movq %r9, %rdx
cmovbel %r8d, %eax
movl %eax, %r8d
movl %r8d, 0x38(%rsp)
shlq $0x3, %r8
callq 0x1800b7c77
leaq 0x20(%rsp), %rcx
callq *0x1f864c(%rip)
nopl (%rax,%rax)
movq 0xc0(%rsp), %rcx
xorq %rsp, %rcx
callq 0x1800af760
addq $0xd8, %rsp
retq
int3
andl $0x0, 0x38(%rsp)
jmp 0x180044660
int3
int3
int3
int3
int3
int3
int3
jno 0x18004465c
popq %rbx
异常发生前的场景是 异常发生前的场景
第84行和第85行有什么区别。
我打印第 72 行的类型以确保类型正确。
./CMakeLists.txt
cmake_minimum_required(VERSION 3.24)
project(ws_binance_time_diff_delay)
set(CMAKE_CXX_STANDARD 17)
add_definitions(
-DBOOST_THREAD_PROVIDES_FUTURE
-DBOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
-DBOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
)
add_executable(ws_binance_time_diff_delay
ws_diff_delay.cpp
)
target_include_directories(ws_binance_time_diff_delay PRIVATE ${CMAKE_SOURCE_DIR})
target_include_directories(ws_binance_time_diff_delay PRIVATE "C:/Users/Mike_Wei/CLionProjects/commonUtils")
find_package(RapidJSON CONFIG REQUIRED)
target_link_libraries(ws_binance_time_diff_delay PRIVATE rapidjson)
find_package(OpenSSL REQUIRED)
find_package(ZLIB REQUIRED)
find_package(Boost REQUIRED COMPONENTS system thread date_time log log_setup program_options)
target_include_directories(ws_binance_time_diff_delay PRIVATE ${Boost_INCLUDE_DIRS})
target_link_libraries(ws_binance_time_diff_delay PRIVATE ${Boost_LIBRARIES})
find_package(Threads REQUIRED)
target_include_directories(ws_binance_time_diff_delay PRIVATE ${OPENSSL_INCLUDE_DIR})
target_link_libraries(ws_binance_time_diff_delay PRIVATE
Boost::system
Boost::log
OpenSSL::SSL
OpenSSL::Crypto
Threads::Threads
)
find_package(jsoncpp CONFIG REQUIRED)
target_link_libraries(ws_binance_time_diff_delay PRIVATE JsonCpp::JsonCpp)
find_package(fmt CONFIG REQUIRED)
target_link_libraries(ws_binance_time_diff_delay PRIVATE fmt::fmt) ```
ws_diff_delay.cpp
#include <cstdlib>
#include <stdint.h>
#include <iostream>
#include <string>
#include <optional>
#include <future>
#include <chrono>
#include <functional>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/log/trivial.hpp>
#include <rapidjson/document.h>
#include <commonUtils.h>
class mini_ws_client {
using tcp = boost::asio::ip::tcp;
const std::string host = "testnet.binance.vision";
const std::string port = "443";
const std::string target = "/ws-api/v3";
boost::asio::ssl::context ctx{boost::asio::ssl::context::sslv23_client};
boost::asio::io_context ioc;
std::shared_ptr<boost::beast::websocket::stream<boost::asio::ssl::stream<tcp::socket> > > ws_sp;
public:
explicit mini_ws_client(
const std::string host = "testnet.binance.vision",
const std::string port = "443",
const std::string target = "/ws-api/v3"
): host(host), port(port), target(target) {
tcp::resolver resolver(ioc);
ws_sp = std::make_shared<boost::beast::websocket::stream<boost::asio::ssl::stream<tcp::socket> > >(ioc, ctx);
if (!SSL_set_tlsext_host_name(ws_sp->next_layer().native_handle(), host.c_str())) {
boost::system::error_code ec{static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category()};
throw boost::system::system_error{ec};
}
auto const results = resolver.resolve(host, port);
auto ep = boost::asio::connect(ws_sp->next_layer().next_layer(), results);
ws_sp->next_layer().handshake(boost::asio::ssl::stream_base::client);
ws_sp->set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type &req) {
req.set(boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
}));
ws_sp->handshake(host, target);
}
~mini_ws_client() {
ws_sp->close(boost::beast::websocket::close_code::normal);
ws_sp.reset();
}
std::pair<double, double> get_diff_delay(
int test_times = 100,
int one_group_num = 10,
const std::string msg = "{\"id\":1,\"method\":\"time\"}"
) const {
auto future_example = std::async(
std::launch::async,
&mini_ws_client::get_one_timestamps,
this,
msg
);
std::cout << "type of future_example is " << typeid(future_example).name() << std::endl;
std::vector<int64_t> timeStamps{};
int cnt{test_times};
while (cnt) {
int cur_group_num{one_group_num};
if (cnt > one_group_num) {
cnt -= one_group_num;
} else {
cur_group_num = test_times;
cnt = 0;
}
std::cout << "test group num " << cur_group_num << "; " << "remain num " << cnt << std::endl;
std::vector<std::future<std::vector<int64_t> > > time_stamps_vec{};
// std::vector<decltype(future_example) > time_stamps_vec{};
for (int i{0}; i < cur_group_num; ++i) {
time_stamps_vec.emplace_back(
std::async(
std::launch::async,
&mini_ws_client::get_one_timestamps,
this,
msg
)
);
}
for (auto ×tamps_future: time_stamps_vec) {
auto one_timestamp = timestamps_future.get();
printVector(one_timestamp);
timeStamps.insert(timeStamps.end(), one_timestamp.begin(), one_timestamp.end());
}
}
auto diff_delay = calculateTimeDiffDelaySub(timeStamps);
return diff_delay;
}
std::vector<int64_t> get_one_timestamps(const std::string &msg) const {
std::vector<int64_t> timeStamps_one_case{};
auto presend_time = getTimeStamp();
auto response_str = request_to_response(msg);
auto postsend_time = getTimeStamp();
auto server_time = get_server_time(response_str);
if (server_time) {
timeStamps_one_case.push_back(presend_time);
timeStamps_one_case.push_back(postsend_time);
timeStamps_one_case.push_back(server_time.value());
}
printVector(timeStamps_one_case);
return timeStamps_one_case;
}
std::pair<double, double> calculateTimeDiffDelaySub(const std::vector<int64_t> &timeStamps) const {
double diff{0.};
double delay{0.};
for (int i{0}; i < timeStamps.size(); i += 3) {
diff += (
timeStamps[i]
+ timeStamps[i + 1]
- (timeStamps[i + 2] << 1)
) * 0.5;
delay += (timeStamps[i + 1] - timeStamps[i]) >> 1;
}
diff /= timeStamps.size() / 3;
delay /= timeStamps.size() / 3;
auto ret = std::make_pair(diff, delay);
return ret;
}
int64_t getTimeStamp(int64_t diff = 0LL) const {
// 获取当前时间点
auto now = std::chrono::system_clock::now();
// 将时间点转换为毫秒
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
// 输出 13 位时间戳
return millis - diff;
}
std::string request_to_response(const std::string &msg) const {
ws_sp->write(boost::asio::buffer(msg));
boost::beast::flat_buffer buffer;
ws_sp->read(buffer);
const char *bufferData = reinterpret_cast<const char *>(buffer.data().data());
std::size_t bufferSize = buffer.data().size();
std::string response(bufferData, bufferSize);
return response;
}
std::optional<int64_t> get_server_time(const std::string &msg) const {
rapidjson::Document msg_doc;
msg_doc.Parse(msg.c_str());
if (msg_doc.HasParseError() || !msg_doc.IsObject()) {
BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " << "JSON parse error or not an object" <<
std::endl;
return std::nullopt;
}
if (!msg_doc.HasMember("result") || !msg_doc["result"].IsObject()) {
BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " <<
"No 'result' field or 'result' is not an object" << std::endl;
return std::nullopt;
}
const rapidjson::Value &result = msg_doc["result"];
if (!result.HasMember("serverTime") || !result["serverTime"].IsInt64()) {
BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " <<
"'serverTime' field missing or not an int64"
<< std::endl;
return std::nullopt;
}
int64_t serverTime = result["serverTime"].GetInt64();
return serverTime;
}
};
int main() {
auto ws_client = std::make_shared<mini_ws_client>("ws-api.binance.com", "443", "/ws-api/v3");
std::cout << ws_client->request_to_response("{\"id\":1,\"method\":\"time\"}") << std::endl;;
auto &[diff,delay] = ws_client->get_diff_delay();
std::cout << "diff = " << diff << std::endl;
std::cout << "delay = " << delay << std::endl;
}
0xc0000409 表示 STATUS_STACK_BUFFER_OVERRUN
使用消毒剂运行代码:
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address,undefined ")
快速诊断一些问题:
显然某些缓冲区在写入操作期间无效。
我将您的代码重构为可读:https://coliru.stacked-crooked.com/a/079b4b92556cd365
// using Sample = std::array<int64_t, 3>;
struct Sample { int64_t pre, post, server; };
using Samples = std::vector<Sample>;
std::pair<double, double> calculateTimeDiffDelaySub(Samples const& samples) const {
double diff{0.};
double delay{0.};
for (auto& [pre, post, server] : samples) {
diff += (pre + post - (server * 2)) * 0.5;
delay += (post - pre) / 2.0;
}
diff /= samples.size();
delay /= samples.size();
return std::pair(diff, delay);
}
无论如何,您正在创建所有使用相同 Websocket 且没有任何同步的野生线程。这既不好又没用。
针对并行批次进行重构
是必须连续的,因为要在单个客户端上运行:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <iostream>
#include <rapidjson/document.h>
#include <fmt/ranges.h>
static inline void printVector(auto const& v) { fmt::print("printVector: {}\n", v); }
// using Sample = std::array<int64_t, 3>;
struct Sample { int64_t pre, post, server; };
using Samples = std::vector<Sample>;
using Message = std::shared_ptr<std::string const>;
static Message default_message() {
static auto instance = std::make_shared<std::string>(R"({"id":1,"method":"time"})");
return instance;
}
namespace asio = boost::asio;
namespace ssl = asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp = asio::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::chrono_literals;
struct Args {
std::string host = "testnet.binance.vision";
std::string port = "443";
std::string target = "/ws-api/v3";
};
class Client {
using tcp = asio::ip::tcp;
Args const args;
ssl::context ctx{ssl::context::sslv23_client};
asio::io_context ioc;
websocket::stream<ssl::stream<tcp::socket>> ws{ioc, ctx};
public:
Client(Args args = {}) : args(std::move(args)) { connect(); }
~Client() {
error_code ec;
ws.close(websocket::close_code::normal, ec);
if (ec.failed())
std::cerr << "~Client: " << ec.message() << std::endl;
}
std::string request(Message msg) {
ws.write(asio::buffer(*msg));
std::string response;
auto buf = asio::dynamic_buffer(response);
ws.read(buf);
return response;
}
private:
void connect() {
tcp::resolver resolver(ioc);
if (!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), args.host.c_str()))
throw boost::system::system_error{
error_code{static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()}};
auto results = resolver.resolve(args.host, args.port);
asio::connect(ws.next_layer().next_layer(), results);
ws.next_layer().handshake(ssl::stream_base::client);
ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
}));
ws.handshake(args.host, args.target);
}
};
namespace Benchmark {
using clock = std::chrono::steady_clock;
using time_point = clock::time_point;
static time_point now() { return clock::now(); }
static time_point time0 = now();
int64_t getTimeStamp() { return (now() - time0) / 1ms; }
std::optional<int64_t> get_server_time(std::string const& msg) {
rapidjson::Document doc;
doc.Parse(msg.c_str());
if (!doc.HasParseError() && doc.IsObject() && doc.HasMember("result"))
if (auto const& result = doc["result"];
result.IsObject() && result.HasMember("serverTime") && result["serverTime"].IsInt64())
return result["serverTime"].GetInt64();
std::cerr << "Unexpected or invalid response" << std::endl;
return {};
}
std::pair<double, double> calculateTimeDiffDelaySub(Samples const& samples) {
double diff{0.};
double delay{0.};
if (!samples.empty()) {
for (auto& [pre, post, server] : samples) {
diff += (pre + post - (server * 2)) * 0.5;
delay += (post - pre) / 2.0;
}
diff /= samples.size();
delay /= samples.size();
}
return std::pair(diff, delay);
}
Samples perform_batch(Args const& args, unsigned n, Message msg) {
Samples samples;
try {
Client client(args);
for (Client c(args); n--;) {
int64_t pre = getTimeStamp();
std::string res = client.request(msg);
int64_t post = getTimeStamp();
if (std::optional<int64_t> server_time = get_server_time(res))
samples.push_back({pre, post, server_time.value()});
else
throw std::runtime_error("No server time in response");
}
} catch (boost::system::system_error const& se) {
std::cerr << "Error in perform_batch: " << se.code().message() << std::endl;
}
return samples;
}
std::pair<double, double> run(Args const& args, unsigned total = 100, unsigned pergroup = 10,
Message msg = default_message()) {
Samples merged;
for (unsigned remain{total}; unsigned batch = std::min(remain, pergroup); remain -= batch) {
std::cout << "batch " << batch << "; remain " << remain << std::endl;
std::vector<std::future<Samples>> futs{};
for (unsigned i = 0; i < batch; ++i)
futs.emplace_back(std::async(std::launch::async, perform_batch, args, batch, msg));
for (auto& fut : futs) {
auto one = fut.get();
merged.insert(merged.end(), one.begin(), one.end());
}
}
return calculateTimeDiffDelaySub(merged);
}
}
int main() {
Args args {"ws-api.binance.com", "443", "/ws-api/v3"};
std::cout << Client(args).request(default_message()) << std::endl;
auto const& [diff, delay] = Benchmark::run(args, 20, 4);
std::cout << "diff=" << diff << " delay=" << delay << std::endl;
}
客户端析构函数期间的虚假错误表明我们可能会遇到速率限制。我将留给您来诊断:完整的本地演示(注意它看起来有多慢):