我尝试在两个协程中使用一个 ws 对象,一个协程用于发送,另一个用于接收。然而,当我尝试在现有协程中使用子协程时,代码停留在 co_await 的 async_write (在“while”部分)。我调用子协程有什么问题吗? async_write 函数在父协程中使用 while(1) 工作,但在子协程中只能工作 1 次并卡在下一次。
这是我的客户端代码。
//
// Copyright (c) 2022 Klemens D. Morgenstern (klemens dot morgenstern at gmx dot net)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//
//------------------------------------------------------------------------------
//
// Example: WebSocket client, coroutine
//
//------------------------------------------------------------------------------
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <string>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <thread>
#include <chrono>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------
// Sends a WebSocket message and prints the response
net::awaitable<void> do_write_loop(auto& ws, std::string text) {
std::cout << "before while";
//std::cout << "do_write_loop"<<i<<std::endl;
co_await ws.async_write(net::buffer(std::string(text)));
}
net::awaitable<void> do_read_loop(auto& ws) {
beast::flat_buffer buffer;
while (true) {
std::cout << "do_read_loop";
co_await ws.async_read(buffer);
std::cout << beast::make_printable(buffer.data()) << std::endl;
}
}
net::awaitable<void> do_session(
std::string host,
std::string port,
std::string text)
{
auto resolver = net::use_awaitable.as_default_on(
tcp::resolver(co_await net::this_coro::executor));
auto ws = net::use_awaitable.as_default_on(
websocket::stream<beast::tcp_stream>(co_await net::this_coro::executor));
auto const results = co_await resolver.async_resolve(host, port);
beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));
auto ep = co_await beast::get_lowest_layer(ws).async_connect(results);
host += ':' + std::to_string(ep.port());
beast::get_lowest_layer(ws).expires_never();
ws.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
ws.set_option(websocket::stream_base::decorator(
[](websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-coro");
}));
co_await ws.async_handshake(host, "/");
std::cout << "after handshake";
//while (1) {
// co_await ws.async_write(net::buffer(std::string(text)));
//}
net::io_context ioc2;
auto executor = co_await boost::asio::this_coro::executor;
// Start the write and read loops concurrently
net::co_spawn(ioc2,
do_write_loop(ws, text), [](std::exception_ptr e)
{
if (e)
try
{
std::cout << "enter e";
std::rethrow_exception(e);
}
catch (std::exception& e)
{
std::cerr << "Error: " << e.what() << "\n";
}
});
net::co_spawn(ioc2,
do_read_loop(ws), [](std::exception_ptr e)
{
if (e)
try
{
std::rethrow_exception(e);
}
catch (std::exception& e)
{
std::cerr << "Error: " << e.what() << "\n";
}
});
ioc2.run();
//ioc3.run();
// Wait for both tasks to complete (this will never happen)
//co_await net::when_all(std::move(write_loop_task), std::move(read_loop_task));
// Close the WebSocket connection (this will never be reached)
co_await ws.async_close(websocket::close_code::normal);
}
//------------------------------------------------------------------------------
int main(int argc, char **argv)
{
// Check command line arguments.
/*if (argc != 4)
{
std::cerr << "Usage: websocket-client-awaitable <host> <port> <text>\n"
<< "Example:\n"
<< " websocket-client-awaitable echo.websocket.org 80 \"Hello, world!\"\n";
return EXIT_FAILURE;
}*/
//auto const host = argv[1];
//auto const port = argv[2];
//auto const text = argv[3];
auto const host = "127.0.0.1";
auto const port = "12345";
auto const text = "argv[3]";
// The io_context is required for all I/O
net::io_context ioc;
// Launch the asynchronous operation
net::co_spawn(ioc,
do_session(host, port, text),
[](std::exception_ptr e)
{
if (e)
try
{
std::rethrow_exception(e);
}
catch (std::exception &e)
{
std::cerr << "Error: " << e.what() << "\n";
}
});
// Run the I/O service. The call will return when
// the socket is closed.
ioc.run();
return EXIT_SUCCESS;
}
#else
int main(int, char *[])
{
std::printf("awaitables require C++20\n");
return 1;
}
#endif
您不必要地使用了第二个 iocontext,然后您对其进行阻塞等待:
ioc2.run();
改为使用 coro 的执行器:
auto ex = co_await boost::asio::this_coro::executor;
// Start the write and read loops concurrently
co_spawn(ex, do_write_loop(ws, text), rethrow);
co_spawn(ex, do_read_loop(ws), rethrow);
或者更好的是,使用可等待运算符来协调并行工作:
co_await (do_write_loop(ws, text) && do_read_loop(ws));
请注意,给定的
do_write_loop
根本不是循环,它将完成。
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/beast.hpp>
#include <iostream>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
using Resolver = net::deferred_t::as_default_on_t<tcp::resolver>;
using WebSock = net::deferred_t::as_default_on_t<websocket::stream<beast::tcp_stream>>;
using namespace std::chrono_literals;
using namespace net::experimental::awaitable_operators;
//------------------------------------------------------------------------------
// Sends a WebSocket message and prints the response
net::awaitable<void> do_write_loop(WebSock& ws, std::string const& text) {
std::cout << "before while" << std::endl;
// std::cout << "do_write_loop" << i << std::endl;
co_await ws.async_write(net::buffer(text));
}
net::awaitable<void> do_read_loop(WebSock& ws) {
beast::flat_buffer buffer;
while (true) {
std::cout << "do_read_loop" << std::endl;
co_await ws.async_read(buffer);
std::cout << beast::make_printable(buffer.data()) << std::endl;
}
}
net::awaitable<void> do_session(std::string host, std::string port, std::string text) {
auto ex = co_await net::this_coro::executor;
WebSock ws(ex);
beast::get_lowest_layer(ws).expires_after(30s);
auto ep = co_await beast::get_lowest_layer(ws) //
.async_connect(co_await Resolver(ex) //
.async_resolve(host, port)); //
beast::get_lowest_layer(ws).expires_never();
std::cout << "TCP connected to " << ep << std::endl;
host += ':' + std::to_string(ep.port());
{
using Opts = websocket::stream_base;
ws.set_option(Opts::timeout::suggested(beast::role_type::client));
ws.set_option(Opts::decorator(
[](websocket::request_type& req) { req.set(http::field::user_agent, "my-client-1.0"); }));
}
co_await ws.async_handshake(host, "/");
std::cout << "after handshake" << std::endl;
co_await (do_write_loop(ws, text) && do_read_loop(ws));
// Close the WebSocket connection (this will never be reached)
co_await ws.async_close(websocket::close_code::normal);
}
//------------------------------------------------------------------------------
static inline void rethrow(std::exception_ptr ep) try {
if (ep)
std::rethrow_exception(ep);
} catch (std::exception const& e) {
std::cerr << "\n rethrow_exception: " << e.what() << "\n";
}
int main() {
auto const host = "127.0.0.1";
auto const port = "12345";
auto const text = "hello world";
net::io_context ioc;
co_spawn(ioc, do_session(host, port, text), rethrow);
ioc.run();
}