我正在尝试修改一些Boost代码,使其与AutoHotKey兼容。原始项目可以在here找到。我的版本可以在这里找到。我可以使用一些帮助来确定如何防止多个并发回调进入用户提供的 AutoHotKey 例程。
这是现有的 on_read 回调 --
/// Callback registered by async_read. It calls user registered callback to actually process the data. And then issue another async_read to wait for data from server again.
/// \param ec instance of error code
/// \param bytes_transferred
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred) {
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}
boost::ignore_unused(bytes_transferred);
{
boost::lock_guard<boost::mutex> guard(mtx_);
if(!Is_Connected) {
return;
}
}
// error occurs
if (ec) {
if(on_fail_cb)
on_fail_cb(L"read");
return fail(ec, L"read");
}
const std::string data = beast::buffers_to_string(buffer_.data());
const std::wstring wdata(data.begin(), data.end());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
}
// The next section is where my issue resides
if (on_data_cb)
on_data_cb(wdata.c_str(), wdata.length());
buffer_.consume(buffer_.size());
if(EnableVerbose)
{
boost::lock_guard<boost::mutex> guard(mtx_);
std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
// Close the WebSocket connection
// ws_.async_close(websocket::close_code::normal,
// beast::bind_front_handler(
// &session::on_close,
// shared_from_this()));
}
代码
if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length());
将回调执行到AutoHotKey中,我需要知道如何防止它一次执行多次。我不太精通 C++ / Boost,所以请温柔一点。 ;-)
温和的答案是指向文档:Strands:使用没有显式锁定的线程
实际上,您没有显示足够的代码。例如,我们无从得知
正在使用什么执行上下文。如果您使用带有单个服务线程的
io_context
run()
,那么您已经拥有隐式链并保证没有处理程序同时运行
IO 对象绑定到哪个执行器。在您的代码中,唯一可见的对象是
ws_
,我们假设它类似于
net::io_context ctx_;
websocket::stream<tcp::socket> ws_{ctx_};
现在,如果您想要多个线程服务
ctx_
,您可以将 ws_
绑定到链执行器:
websocket::stream<tcp::socket> ws_{make_strand(ctx_)};
现在,只要您确保自己的访问(例如 async_ 启动)位于正确的链上,您的代码就已经安全了。如果您愿意 - 并且您不介意对执行器类型进行硬编码,您可以断言:
自动链= ws_.get_executor().targetnet::strand
专业提示:
如果您确实致力于特定的执行程序类型,请考虑静态绑定该类型:
using Context = net::io_context::executor_type; using Executor = net::io_context::executor_type; using Strand = net::strand<net::io_context::executor_type>; using Socket = net::basic_stream_socket<tcp, Strand>; Context ctx_; websocket::stream<Socket> ws_{make_strand(ctx_)};
这避免了类型擦除执行器的开销,您可以 简化断言:
assert(ws_.get_executor().running_in_this_thread());
不要锁定完整的互斥体来检查布尔值,而是使用
atomic_bool
。
使用标准库功能(
std::mutex
、std::lock_guard
)让事情变得更容易、更好
考虑针对 ANSI 构建进行构建,或检查您的扩展方法:
const std::wstring wdata(data.begin(), data.end());
是一种反模式。请参阅此处 https://en.cppreference.com/w/cpp/string/multibyte 或此处了解更多信息 https://en.cppreference.com/w/cpp/locale/ctype/widen
强制性“实时”代码:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;
static std::mutex s_consoleMtx;
static void fail(beast::error_code ec, std::string txt) {
std::cerr << txt << ": " << ec.message() << " at " << ec.location() << std::endl;
}
#define ARCH_LABEL "STACKO"
struct session : std::enable_shared_from_this<session> {
using Context = net::io_context::executor_type;
using Executor = net::io_context::executor_type;
using Strand = net::strand<net::io_context::executor_type>;
using Socket = net::basic_stream_socket<tcp, Strand>;
Context ctx_;
websocket::stream<Socket> ws_{make_strand(ctx_)};
static bool const EnableVerbose = true;
std::atomic_bool Is_Connected = false;
beast::flat_buffer buffer_;
std::function<void(std::string)> on_fail_cb;
std::function<void(char const*, size_t)> on_data_cb;
/// Callback registered by async_read. It calls user registered
/// callback to actually process the data. And then issue another
/// async_read to wait for data from server again.
/// \param ec instance of error code
/// \param bytes_transferred
void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> in on read" << std::endl;
}
if (!Is_Connected)
return;
// error occurs
if (ec) {
if (on_fail_cb)
on_fail_cb("read");
return fail(ec, "read");
}
std::string const data = beast::buffers_to_string(buffer_.data());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << "] " << data << std::endl;
}
if (on_data_cb)
on_data_cb(data.c_str(), data.length());
buffer_.consume(buffer_.size());
if (EnableVerbose) {
std::lock_guard<std::mutex> guard(s_consoleMtx);
std::cout << "<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
}
assert(ws_.get_executor().running_in_this_thread());
ws_.async_read(buffer_, beast::bind_front_handler(&session::on_read, shared_from_this()));
}
};