我有以下代码:
std::queue< nlohmann::json > outgoingMessages;
void session::do_write( void ) {
if ( outgoingMessages.size() > 0 ) {
auto message = outgoingMessages.front();
outgoingMessages.pop();
ws_.async_write( boost::asio::buffer( message.dump() ), boost::beast::bind_front_handler( & session::on_write, shared_from_this() ) );
}
};
void session::on_write( boost::beast::error_code errorCode, std::size_t bytes_transferred ) {
if ( errorCode )
return fail( errorCode, "write" );
if ( bytes_transferred == 0 )
std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );
do_write();
};
我需要它做的就是只有在有数据要写入时才写入。问题是,当队列为空时,永远不会调用
async_write()
,从而打破循环。效果很好,直到用完要发送的队列项目为止。
我遇到了同样的问题,并通过递归调用
do_write()
并检查队列是否包含新项目来修复它。只要您确保 async_write()
没有同时调用,就应该没问题。
尝试将代码更改为:
std::queue< nlohmann::json > outgoingMessages;
bool isWriting = false;
void session::do_write( void ) {
if ( !isWriting && outgoingMessages.size() > 0 ) {
isWriting = true;
auto message = outgoingMessages.front();
ws_.async_write( boost::asio::buffer( outgoingMessages.dump() ), boost::beast::bind_front_handler( & session::on_write, shared_from_this() ) );
} else
{
// Repeat write
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
do_write();
};
void session::on_write( boost::beast::error_code errorCode, std::size_t bytes_transferred ) {
if ( errorCode )
return fail( errorCode, "write" );
if ( bytes_transferred == 0 )
std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );
outgoingMessages.pop();
// Repeat write for additional messages
if (!outgoingMessages.empty())
{
ws_.async_write(
boost::asio::buffer(outgoingMessages.front()),
beast::bind_front_handler(
&session::on_write,
shared_from_this()));
}
else
{
isWriting = false;
do_write();
}
};
您必须同步对写入队列的访问,以便添加到其中的线程和从中删除的线程不会互相干扰!
最简单的方法是使用 websocket 的链:
class session : public std::enable_shared_from_this<session>
{
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
std::queue<std::string> outgoingMessages;
bool isWriting { false }
// More data fields omitted
// Code for connecting the websocket etc. omitted
// Write out the next pending message, if no write is active:
void do_write(void) {
if(! isWriting && ! outgoingMessages.empty()) {
auto message = std::move(outgoingMessages.front());
outgoingMessages.pop_front();
ws_.async_write(boost::asio::buffer(std::move(message)),
boost::beast::bind_front_handler(&session::on_write, shared_from_this()));
}
}
void on_write(boost::beast::error_code errorCode, std::size_t) {
isWriting = false;
if(errorCode)
return fail(errorCode, "write");
do_write();
}
public:
// Access to the websocket is serialized by a strand!
session(boost::asio::io_context& ioc, boost::asio::ssl::context& ctx)
: ws_(boost::asio::make_strand(ioc), ctx) {}
// May be called from any thread, but dispatches the actual enqueuing
// to the websocket's strand to ensure thread safety:
void enqueue(const nlohmann::json& jsonmessage) {
boost::asio::dispatch(ws_.get_executor(), [msg = jsonmessage.dump()] () mutable
{
outgoingMessages.emplace_back(std::move(msg));
do_write();
});
}
};