Boost Beast async_write() 与队列

问题描述 投票:0回答:2

我有以下代码:

std::queue< nlohmann::json > outgoingMessages;

void session::do_write( void ) {
    if ( outgoingMessages.size() > 0 ) {
        auto message = outgoingMessages.front();
        outgoingMessages.pop();

        ws_.async_write( boost::asio::buffer( message.dump() ), boost::beast::bind_front_handler( & session::on_write, shared_from_this() ) );
    }
};

void session::on_write( boost::beast::error_code errorCode, std::size_t bytes_transferred ) {
    if ( errorCode )
        return fail( errorCode, "write" );

    if ( bytes_transferred == 0 )
        std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );

    do_write();
};

我需要它做的就是只有在有数据要写入时才写入。问题是,当队列为空时,永远不会调用

async_write()
,从而打破循环。效果很好,直到用完要发送的队列项目为止。

c++ asynchronous queue boost-beast
2个回答
0
投票

我遇到了同样的问题,并通过递归调用

do_write()
并检查队列是否包含新项目来修复它。只要您确保
async_write()
没有同时调用,就应该没问题。

尝试将代码更改为:

std::queue< nlohmann::json > outgoingMessages;
bool isWriting = false;

void session::do_write( void ) {
    if ( !isWriting && outgoingMessages.size() > 0 ) {
        isWriting = true;
        auto message = outgoingMessages.front();
        
        ws_.async_write( boost::asio::buffer( outgoingMessages.dump() ), boost::beast::bind_front_handler( & session::on_write, shared_from_this() ) );
    } else
    {
        // Repeat write
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        do_write();
};

void session::on_write( boost::beast::error_code errorCode, std::size_t bytes_transferred ) {
    if ( errorCode )
        return fail( errorCode, "write" );

    if ( bytes_transferred == 0 )
        std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) );

    outgoingMessages.pop();

    // Repeat write for additional messages
    if (!outgoingMessages.empty())
    {
        ws_.async_write(
            boost::asio::buffer(outgoingMessages.front()),
            beast::bind_front_handler(
                &session::on_write,
                shared_from_this()));
    }
    else
    {
        isWriting = false;
        do_write();
    }
};

0
投票

您必须同步对写入队列的访问,以便添加到其中的线程和从中删除的线程不会互相干扰!

最简单的方法是使用 websocket 的链:

class session : public std::enable_shared_from_this<session>
{
    websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
    std::queue<std::string> outgoingMessages;
    bool isWriting { false }
    // More data fields omitted

    // Code for connecting the websocket etc. omitted

    // Write out the next pending message, if no write is active:
    void do_write(void) {
        if(! isWriting && ! outgoingMessages.empty()) {
            auto message = std::move(outgoingMessages.front());
            outgoingMessages.pop_front();
            ws_.async_write(boost::asio::buffer(std::move(message)),
                            boost::beast::bind_front_handler(&session::on_write, shared_from_this()));
        }
    }

    void on_write(boost::beast::error_code errorCode, std::size_t) {
        isWriting = false;
        if(errorCode)
            return fail(errorCode, "write");
        do_write();
    }

public:
    // Access to the websocket is serialized by a strand!
    session(boost::asio::io_context& ioc, boost::asio::ssl::context& ctx)
      : ws_(boost::asio::make_strand(ioc), ctx) {}


    // May be called from any thread, but dispatches the actual enqueuing
    // to the websocket's strand to ensure thread safety:
    void enqueue(const nlohmann::json& jsonmessage) {
        boost::asio::dispatch(ws_.get_executor(), [msg = jsonmessage.dump()] () mutable
        {
            outgoingMessages.emplace_back(std::move(msg));
            do_write();
        });
    }
};
© www.soinside.com 2019 - 2024. All rights reserved.