保存 boost asio 异步操作完成处理程序

问题描述 投票:0回答:1

我需要使用 boost asio 库实现自定义异步操作。该操作本身将由第三方库执行。

我遵循的方法是:

  1. 通过
    boost::asio::async_initiate
    启动异步操作。为此,我正在实现启动功能
  2. 存储异步完成处理程序,由
    boost::asio::async_initiate
    传递给启动函数
  3. 启动第3方异步操作
  4. 退出启动功能
    根据boost参考启动函数应该是非阻塞的。
  5. 当异步操作完成时,调用之前存储的异步完成处理程序

复制代码如下。这里的第 3 方库是用单独的线程和布尔标志来模拟的:

#include <iostream>
#include <string>
#include <thread>
#include <functional>
#include <atomic>
#include <boost/asio.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/co_spawn.hpp>

std::atomic<bool> asyncOpStarted = false;

using signature = void(std::string);
std::function<signature> completionHandler;

// Dummy implementation to simulate the async operation
void start_operation()
{
    asyncOpStarted = true;
    std::cout << "Async operation starting..." << std::endl;
}

void do_async_loop()
{
    while (!asyncOpStarted) {
        // sleep & check if operation started
        std::cout << "Waiting for async start..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    };

    std::cout << "Async operation started..." << std::endl;
    // simualate delay
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Async operation finished. Invoke completion handler..." << std::endl;
    completionHandler("Done");
}

// Async operation implementation
template <typename CompletionToken>
auto async_do_operation(boost::asio::io_context& io, CompletionToken&& token)
{  
    return boost::asio::async_initiate<CompletionToken, signature>(
        [&](auto&& handler) {
            // I need to store "handler" here

            // This doesn't work. std::function<...> requires handler to be copyable:
            //completionHandler = [h = std::move(handler)](std::string res) mutable {
            //    h(res);
            //};

            start_operation();
        }, 
        std::move(token)
    );
}

int main()
{
    std::thread t(do_async_loop);

    boost::asio::io_context io;
    
    boost::asio::co_spawn(io, [&]() -> boost::asio::awaitable<void> {
        try {
            // Use `boost::asio::use_awaitable` as the completion token
            auto result = co_await async_do_operation(io, boost::asio::use_awaitable);
            std::cout << "Coroutine: " << result << std::endl;
        } catch (...) {
            std::cerr << "Exception caught" << std::endl;
        }
    }, boost::asio::detached);

    // Run the io_context to process async events
    io.run();
  
}

我在步骤(2)中失败了。无法以任何方式存储完成处理程序。 我尝试了以下方法:

  1. 使用
    std::function
    并将完成处理程序移动到捕获列表中
    这不起作用,因为
    std::function
    要求处理程序可复制
  2. 创建指向完成处理程序的共享指针,然后可以将其捕获到 lambda。
  3. 完成处理程序直接存储到变量中。

所以,我的问题是:

  1. 我是否以正确的方式实施集成?
  2. 如果是,boost asio 中在操作运行时存储完成处理程序的设计方法是什么?

提前致谢。

c++ boost asio c++-coroutine
1个回答
0
投票

您可以使用相对较新的

asio::any_completion_handler
类型擦除容器。

这是一个简化版本,还修复了一些问题:

  • std::forward
    而不是
    std::move
  • 合成处理程序上缺少
    std::move
    (在
    async_initiate
    中)
  • 缺少螺纹连接
  • 不要处理对执行上下文的引用,而是通过传递执行器(甚至检测当前协程的执行器)来解耦。

住在Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <string>
#include <thread>
namespace asio = boost::asio;

std::atomic<bool> asyncOpStarted = false;

using signature = void(std::string);
asio::any_completion_handler<signature> completionHandler;

// Dummy implementation to simulate the async operation
void start_operation() {
    asyncOpStarted = true;
    std::cout << "Async operation starting..." << std::endl;
}

void do_async_loop() {
    while (!asyncOpStarted) {
        // sleep & check if operation started
        std::cout << "Waiting for async start..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    };

    std::cout << "Async operation started..." << std::endl;
    // simualate delay
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Async operation finished. Invoke completion handler..." << std::endl;
    completionHandler("Done");
}

template <typename Token> //
auto async_do_operation(asio::any_io_executor ex, Token&& token) {
    return asio::async_initiate<Token, signature>(
        [&](auto&& handler) {
            completionHandler =                                         //
                [w = make_work_guard(ex), handler = std::move(handler)] //
                (std::string res) mutable                               //
            {                                                           //
                std::move(handler)(std::move(res));
            };

            start_operation();
        },
        std::forward<Token>(token));
}

asio::awaitable<void> coro() try {
    auto ex = co_await asio::this_coro::executor;
    std::cout << "Coroutine: " << co_await async_do_operation(ex, asio::deferred) << std::endl;
} catch (...) {
    std::cerr << "Exception caught" << std::endl;
}

int main() {
    std::thread t(do_async_loop);

    asio::io_context io;

    co_spawn(io, coro, asio::detached);

    io.run();

    t.join();
}

输出:

Waiting for async start...
Async operation starting...
Async operation started...
Async operation finished. Invoke completion handler...
Coroutine: Done
© www.soinside.com 2019 - 2024. All rights reserved.