在一个协程的子协程中使用 websocket 时卡在 co_await

问题描述 投票:0回答:1

我尝试在两个协程中使用一个 ws 对象,一个协程用于发送,另一个用于接收。然而,当我尝试在现有协程中使用子协程时,代码停留在 co_await 的 async_write (在“while”部分)。我调用子协程有什么问题吗? async_write 函数在父协程中使用 while(1) 工作,但在子协程中只能工作 1 次并卡在下一次。

这是我的客户端代码。

//
// Copyright (c) 2022 Klemens D. Morgenstern (klemens dot morgenstern at gmx dot net)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//

//------------------------------------------------------------------------------
//
// Example: WebSocket client, coroutine
//
//------------------------------------------------------------------------------

#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <string>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <thread>
#include <chrono>

#if defined(BOOST_ASIO_HAS_CO_AWAIT)

namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

//------------------------------------------------------------------------------

// Sends a WebSocket message and prints the response
net::awaitable<void> do_write_loop(auto& ws, std::string text) {
    std::cout << "before while";
        //std::cout << "do_write_loop"<<i<<std::endl;
        co_await ws.async_write(net::buffer(std::string(text)));
}

net::awaitable<void> do_read_loop(auto& ws) {
    beast::flat_buffer buffer;
    while (true) {
        std::cout << "do_read_loop";
        co_await ws.async_read(buffer);
        std::cout << beast::make_printable(buffer.data()) << std::endl;
    }
}

net::awaitable<void> do_session(
    std::string host,
    std::string port,
    std::string text)
{
    auto resolver = net::use_awaitable.as_default_on(
        tcp::resolver(co_await net::this_coro::executor));
    auto ws = net::use_awaitable.as_default_on(
        websocket::stream<beast::tcp_stream>(co_await net::this_coro::executor));
    auto const results = co_await resolver.async_resolve(host, port);
    beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));
    auto ep = co_await beast::get_lowest_layer(ws).async_connect(results);
    host += ':' + std::to_string(ep.port());
    beast::get_lowest_layer(ws).expires_never();
    ws.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
    ws.set_option(websocket::stream_base::decorator(
        [](websocket::request_type& req)
        {
            req.set(http::field::user_agent,
            std::string(BOOST_BEAST_VERSION_STRING) +
            " websocket-client-coro");
        }));
    co_await ws.async_handshake(host, "/");
    std::cout << "after handshake";
    //while (1) {
    //    co_await ws.async_write(net::buffer(std::string(text)));
    //}






    net::io_context ioc2;
    auto executor = co_await boost::asio::this_coro::executor;
    // Start the write and read loops concurrently
        net::co_spawn(ioc2,
            do_write_loop(ws, text), [](std::exception_ptr e)
            {
                if (e)
                    try
                {
                    std::cout << "enter e";
                    std::rethrow_exception(e);
                }
                catch (std::exception& e)
                {
                    std::cerr << "Error: " << e.what() << "\n";
                }
            });
    net::co_spawn(ioc2,
        do_read_loop(ws), [](std::exception_ptr e)
        {
            if (e)
                try
            {
                std::rethrow_exception(e);
            }
            catch (std::exception& e)
            {
                std::cerr << "Error: " << e.what() << "\n";
            }
        });
    ioc2.run();
    //ioc3.run();
    // Wait for both tasks to complete (this will never happen)
    //co_await net::when_all(std::move(write_loop_task), std::move(read_loop_task));

    // Close the WebSocket connection (this will never be reached)
    co_await ws.async_close(websocket::close_code::normal);
}

//------------------------------------------------------------------------------

int main(int argc, char **argv)
{
    // Check command line arguments.
    /*if (argc != 4)
    {
        std::cerr << "Usage: websocket-client-awaitable <host> <port> <text>\n"
                  << "Example:\n"
                  << "    websocket-client-awaitable echo.websocket.org 80 \"Hello, world!\"\n";
        return EXIT_FAILURE;
    }*/
    //auto const host = argv[1];
    //auto const port = argv[2];
    //auto const text = argv[3];
    auto const host = "127.0.0.1";
    auto const port = "12345";
    auto const text = "argv[3]";

    // The io_context is required for all I/O
    net::io_context ioc;

    // Launch the asynchronous operation
    net::co_spawn(ioc,
                  do_session(host, port, text),
                  [](std::exception_ptr e)
                  {
                      if (e)
                          try
                          {
                              std::rethrow_exception(e);
                          }
                          catch (std::exception &e)
                          {
                              std::cerr << "Error: " << e.what() << "\n";
                          }
                  });

    // Run the I/O service. The call will return when
    // the socket is closed.
    ioc.run();

    return EXIT_SUCCESS;
}

#else

int main(int, char *[])
{
    std::printf("awaitables require C++20\n");
    return 1;
}

#endif
boost c++20 boost-asio coroutine boost-coroutine
1个回答
0
投票

您不必要地使用了第二个 iocontext,然后您对其进行阻塞等待:

ioc2.run();

改为使用 coro 的执行器:

auto ex = co_await boost::asio::this_coro::executor;
// Start the write and read loops concurrently
co_spawn(ex, do_write_loop(ws, text), rethrow);
co_spawn(ex, do_read_loop(ws), rethrow);

或者更好的是,使用可等待运算符来协调并行工作

co_await (do_write_loop(ws, text) && do_read_loop(ws));

请注意,给定的

do_write_loop
根本不是循环,它将完成。

现场演示

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/beast.hpp>
#include <iostream>
namespace beast     = boost::beast;         // from <boost/beast.hpp>
namespace http      = beast::http;          // from <boost/beast/http.hpp>
namespace websocket = beast::websocket;     // from <boost/beast/websocket.hpp>
namespace net       = boost::asio;          // from <boost/asio.hpp>
using tcp           = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
using Resolver      = net::deferred_t::as_default_on_t<tcp::resolver>;
using WebSock       = net::deferred_t::as_default_on_t<websocket::stream<beast::tcp_stream>>;
using namespace std::chrono_literals;
using namespace net::experimental::awaitable_operators;

//------------------------------------------------------------------------------

// Sends a WebSocket message and prints the response
net::awaitable<void> do_write_loop(WebSock& ws, std::string const& text) {
    std::cout << "before while" << std::endl;
    // std::cout << "do_write_loop" << i << std::endl;
    co_await ws.async_write(net::buffer(text));
}

net::awaitable<void> do_read_loop(WebSock& ws) {
    beast::flat_buffer buffer;
    while (true) {
        std::cout << "do_read_loop" << std::endl;
        co_await ws.async_read(buffer);
        std::cout << beast::make_printable(buffer.data()) << std::endl;
    }
}

net::awaitable<void> do_session(std::string host, std::string port, std::string text) {
    auto    ex = co_await net::this_coro::executor;
    WebSock ws(ex);

    beast::get_lowest_layer(ws).expires_after(30s);
    auto ep = co_await beast::get_lowest_layer(ws)                //
                  .async_connect(co_await Resolver(ex)            //
                                     .async_resolve(host, port)); //
    beast::get_lowest_layer(ws).expires_never();

    std::cout << "TCP connected to " << ep << std::endl;
    host += ':' + std::to_string(ep.port());

    {
        using Opts = websocket::stream_base;
        ws.set_option(Opts::timeout::suggested(beast::role_type::client));
        ws.set_option(Opts::decorator(
                    [](websocket::request_type& req) { req.set(http::field::user_agent, "my-client-1.0"); }));
    }
    co_await ws.async_handshake(host, "/");

    std::cout << "after handshake" << std::endl;
    co_await (do_write_loop(ws, text) && do_read_loop(ws));

    // Close the WebSocket connection (this will never be reached)
    co_await ws.async_close(websocket::close_code::normal);
}

//------------------------------------------------------------------------------
static inline void rethrow(std::exception_ptr ep) try {
    if (ep)
        std::rethrow_exception(ep);
} catch (std::exception const& e) {
    std::cerr << "\n rethrow_exception: " << e.what() << "\n";
}

int main() {
    auto const host = "127.0.0.1";
    auto const port = "12345";
    auto const text = "hello world";

    net::io_context ioc;
    co_spawn(ioc, do_session(host, port, text), rethrow);

    ioc.run();
}

© www.soinside.com 2019 - 2024. All rights reserved.