我可以并行运行 N 个 boost::asio::awaitable 协同例程(或协同例程向量)并附加截止时间计时器吗?

问题描述 投票:0回答:1

我正在尝试执行一堆 boost::asio::awaitable 协同例程,但我不知道运行时的确切数量(它根据条件而变化)。我需要它们并行运行,并且如果整个操作超时,我需要它们全部取消并返回错误。

我想出了以下代码+伪代码来演示我想要做什么。

注意:请随时批评我的截止日期计时器想法。我想不出任何更干净的东西。

#include <boost/asio/thread_pool.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/detached.hpp>  

#include <iostream>
#include <chrono>
#include <system_error>

using namespace std::chrono_literals;
using namespace boost::asio::experimental::awaitable_operators;

boost::asio::awaitable<void> do_work( int index, std::chrono::steady_clock::time_point tp )
{
    std::cout << "Starting work." << std::endl;

    boost::asio::steady_timer timer( co_await boost::asio::this_coro::executor );
    timer.expires_at( tp );
    co_await timer.async_wait( boost::asio::deferred );

    std::cout << "Work " << index << " finished!" << std::endl;
}

boost::asio::awaitable<void> test()
{
    try
    {
        auto executor  = co_await boost::asio::this_coro::executor;

        boost::asio::steady_timer deadline_timer( executor );
        
        auto schedule_deadline = [ &deadline_timer ]( std::chrono::steady_clock::time_point tp ) -> boost::asio::awaitable<void>
        {
            std::cout << "Starting deadline" << std::endl;
            try
            {
                deadline_timer.expires_at( tp );
                co_await deadline_timer.async_wait( boost::asio::deferred  );
                throw boost::system::system_error( std::make_error_code( std::errc::timed_out ) );
            }
            catch( const boost::system::system_error& e )
            {
                if( e.code() == boost::asio::error::operation_aborted )
                    std::cout << "Timer was canceled." << std::endl;
                else
                    std::cerr << "Unexpected error: " << e.what() << std::endl;
            }

            std::cout << "Ending deadline" << std::endl;
        };

        auto execute_coro = [ &deadline_timer ]( auto &&coro ) -> boost::asio::awaitable<void>
        {
            co_await std::forward<decltype(coro)>(coro);

            deadline_timer.cancel();
        };

        std::cout << "Scenario 1." << std::endl;

        auto now = std::chrono::steady_clock::now();


        // Pesudo code from here!!!
        // Collection of N co-routine tasks
        std::vector<awaitables> coros;

        // Add these all in a loop (no loop in this example)
        coros.push_back( do_work( 1, now + 1s ) );
        coros.push_back( do_work( 2, now + 2s ) );
        coros.push_back( do_work( 3, now + 3s ) );

        // Await them all and let them execute in parallel
        auto all_coros = parallel_coros_group( coros );

        co_await (
            // Wrap in a timer canceling function
            execute_coro( all_coros ) &&
            // Schedule a deadline
                schedule_deadline( now + 5s )
        );

    }
    catch( const std::exception &ex )
    {
        std::cout << "Exception: " << ex.what() << std::endl;
    }
}

int main()
{
    boost::asio::thread_pool ioc(1);

    co_spawn(
        ioc,
        test(),
        boost::asio::detached
    );

    ioc.join();
}

这样的事情可能吗?如何做到?

c++ c++20 boost-asio boost-coroutine deadline-timer
1个回答
0
投票

您可以使用范围并行组:

asio::awaitable<void> scenario(unsigned n) try {
    auto ex = co_await asio::this_coro::executor;

    using Task = decltype(co_spawn(ex, do_work(1, 1s), asio::deferred));
    std::vector<Task> tasks;
    for (unsigned i = 1; i <= n; ++i)
        tasks.push_back(co_spawn(ex, do_work(i, i * 1s), asio::deferred));

    // Await them all and let them execute in parallel
    auto grp = asio::experimental::make_parallel_group(std::move(tasks));

    auto rr = co_await ( //
        grp.async_wait(asio::experimental::wait_for_all(), asio::use_awaitable) ||
        asio::steady_timer(ex, 5s).async_wait(asio::use_awaitable) //
    );

    if (rr.index() == 0) {
        std::cout << "All tasks finished!" << std::endl;
    } else {
        std::cout << "Timeout!" << std::endl;
    }

} catch (std::exception const& ex) {
    std::cout << "Exception: " << ex.what() << std::endl;
}

观看包含两种场景的现场演示:

住在Coliru

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>

namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;

asio::awaitable<void> do_work(int index, std::chrono::steady_clock::duration d) {
    std::cout << "Starting work." << std::endl;

    co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait(asio::deferred);

    std::cout << "Work " << index << " finished!" << std::endl;
}

asio::awaitable<void> scenario(unsigned n) try {
    auto ex = co_await asio::this_coro::executor;

    using Task = decltype(co_spawn(ex, do_work(1, 1s), asio::deferred));
    std::vector<Task> tasks;
    for (unsigned i = 1; i <= n; ++i)
        tasks.push_back(co_spawn(ex, do_work(i, i * 1s), asio::deferred));

    // Await them all and let them execute in parallel
    auto grp = asio::experimental::make_parallel_group(std::move(tasks));

    auto rr = co_await ( //
        grp.async_wait(asio::experimental::wait_for_all(), asio::use_awaitable) ||
        asio::steady_timer(ex, 5s).async_wait(asio::use_awaitable) //
    );

    if (rr.index() == 0) {
        std::cout << "All tasks finished!" << std::endl;
    } else {
        std::cout << "Timeout!" << std::endl;
    }

} catch (std::exception const& ex) {
    std::cout << "Exception: " << ex.what() << std::endl;
}

asio::awaitable<void> test() {
    std::cout << "Scenario 1." << std::endl;
    co_await scenario(3);

    std::cout << "Scenario 2." << std::endl;
    co_await scenario(6);
}

int main() {
    asio::thread_pool ioc(1);

    co_spawn(ioc, test(), asio::detached);

    ioc.join();
}

印刷:

Scenario 1.
Starting work.
Starting work.
Starting work.
Work 1 finished!
Work 2 finished!
Work 3 finished!
All tasks finished!
Scenario 2.
Starting work.
Starting work.
Starting work.
Starting work.
Starting work.
Starting work.
Work 1 finished!
Work 2 finished!
Work 3 finished!
Work 4 finished!
Work 5 finished!
Timeout!
© www.soinside.com 2019 - 2024. All rights reserved.