DoBoost :: Asio C ++ 20 Coroutines支持多线程?

问题描述 投票:0回答:1
DoBoost :: Asio C ++ 20 Coroutines支持多线程?

BOOST :: ASIO文档示例都是单线程,是否有多线程示例?

c++ boost-asio c++20 c++-coroutine
1个回答
4
投票

yes

在ASIO,如果多个线程运行执行上下文,您通常也不会控制哪个线程恢复您的coroutine。 您可以查看一些询问如何切换执行者中间执行者的答案(控制哪些或执行上下文可能会恢复Coro):

Aasio如何更改已久的执行者?


要使

C++ 20 Coro Echo Server样品示例多线程您可以更改2行:

boost::asio::io_context io_context(1); // ... io_context.run(); into

boost::asio::thread_pool io_context;
// ...
io_context.join();

由于每个Coro是一个隐式(或逻辑上)的链,因此无需其他。注意:

除非您在Coroutines内部进行重要的工作,否则这可能是没有用的,这会减慢单个线程的IO多路复用。
在练习中,单个线程可以轻松处理10K并发连接,尤其是使用C ++ 20 Coroutines。

注意,用并发提示运行

asio::io_context(1)
    可能是一个重要的性能
  • gain
  • ,因为它可以避免在开销上同步。
  • 当您介绍例如异步会话控件或全双工您需要明确的链。在下面的示例中,我显示您将如何使每个“会话”使用链,例如进行优雅的关闭
  • live在coliru
  • #include <boost/asio.hpp>
    #include <boost/asio/co_spawn.hpp>
    #include <boost/asio/experimental/awaitable_operators.hpp>
    #include <iostream>
    #include <list>
    
    namespace asio = boost::asio;
    namespace this_coro = asio::this_coro;
    using boost::system::error_code;
    using asio::ip::tcp;
    using asio::detached;
    using executor_type = asio::any_io_executor;
    using socket_type   = asio::use_awaitable_t<>::as_default_on_t<tcp::socket>; // or tcp::socket
                                                                                 //
    using session_state = std::shared_ptr<socket_type>;                          // or any additional state
    using handle        = std::weak_ptr<session_state::element_type>;
    
    using namespace std::string_view_literals;
    using namespace asio::experimental::awaitable_operators;
    
    asio::awaitable<void> echo_session(session_state s) {
        try {
            for (std::array<char, 1024> data;;) {
                size_t n = co_await s->async_read_some(asio::buffer(data));
                co_await async_write(*s, asio::buffer(data, n));
            }
        } catch (boost::system::system_error const& se) {
            if (se.code() != asio::error::operation_aborted) // expecting cancellation
                throw;
        } catch (std::exception const& e) {
            std::cout << "echo Exception: " << e.what() << std::endl;
            co_return;
        }
    
        error_code ec;
        co_await async_write(*s, asio::buffer("Server is shutting down\n"sv),
                             redirect_error(asio::use_awaitable, ec));
    
        // std::cout << "echo shutdown: " << ec.message() << std::endl;
    }
    
    asio::awaitable<void> listener(std::list<handle>& sessions) {
        auto ex = co_await this_coro::executor;
    
        for (tcp::acceptor acceptor(ex, {tcp::v4(), 55555});;) {
            session_state s = std::make_shared<socket_type>(
                co_await acceptor.async_accept(make_strand(ex), asio::use_awaitable));
    
            sessions.remove_if(std::mem_fn(&handle::expired)); // "garbage collect", optional
            sessions.emplace_back(s);
    
            co_spawn(ex, echo_session(s), detached);
        }
    }
    
    int main() {
        std::list<handle> handles;
    
        asio::thread_pool io_context;
        asio::signal_set signals(io_context, SIGINT, SIGTERM);
    
        auto handler = [&handles](std::exception_ptr ep, auto result) {
            try {
                if (ep)
                    std::rethrow_exception(ep);
    
                int signal = get<1>(result);
                std::cout << "Signal: " << ::strsignal(signal) << std::endl;
                for (auto h : handles)
                    if (auto s = h.lock()) {
                        // more logic could be implemented via members on a session_state struct
                        std::cout << "Shutting down live session " << s->remote_endpoint() << std::endl;
                        post(s->get_executor(), [s] { s->cancel(); });
                    }
            } catch (std::exception const& e) {
                std::cout << "Server: " << e.what() << std::endl;
            }
        };
    
        co_spawn(io_context, listener(handles) || signals.async_wait(asio::use_awaitable), handler);
    
        io_context.join();
    }
    
  • Online演示和当地演示:

请注意,在线程之间迁移的等待问题可能发生的问题
https://github.com/chriskohlhoff/asio/issues/1366,1.
    

© www.soinside.com 2019 - 2025. All rights reserved.