我正在尝试执行一堆 boost::asio::awaitable 协同例程,但我不知道运行时的确切数量(它根据条件而变化)。我需要它们并行运行,并且如果整个操作超时,我需要它们全部取消并返回错误。
我想出了以下代码+伪代码来演示我想要做什么。
注意:请随时批评我的截止日期计时器想法。我想不出任何更干净的东西。
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/detached.hpp>
#include <iostream>
#include <chrono>
#include <system_error>
using namespace std::chrono_literals;
using namespace boost::asio::experimental::awaitable_operators;
boost::asio::awaitable<void> do_work( int index, std::chrono::steady_clock::time_point tp )
{
std::cout << "Starting work." << std::endl;
boost::asio::steady_timer timer( co_await boost::asio::this_coro::executor );
timer.expires_at( tp );
co_await timer.async_wait( boost::asio::deferred );
std::cout << "Work " << index << " finished!" << std::endl;
}
boost::asio::awaitable<void> test()
{
try
{
auto executor = co_await boost::asio::this_coro::executor;
boost::asio::steady_timer deadline_timer( executor );
auto schedule_deadline = [ &deadline_timer ]( std::chrono::steady_clock::time_point tp ) -> boost::asio::awaitable<void>
{
std::cout << "Starting deadline" << std::endl;
try
{
deadline_timer.expires_at( tp );
co_await deadline_timer.async_wait( boost::asio::deferred );
throw boost::system::system_error( std::make_error_code( std::errc::timed_out ) );
}
catch( const boost::system::system_error& e )
{
if( e.code() == boost::asio::error::operation_aborted )
std::cout << "Timer was canceled." << std::endl;
else
std::cerr << "Unexpected error: " << e.what() << std::endl;
}
std::cout << "Ending deadline" << std::endl;
};
auto execute_coro = [ &deadline_timer ]( auto &&coro ) -> boost::asio::awaitable<void>
{
co_await std::forward<decltype(coro)>(coro);
deadline_timer.cancel();
};
std::cout << "Scenario 1." << std::endl;
auto now = std::chrono::steady_clock::now();
// Pesudo code from here!!!
// Collection of N co-routine tasks
std::vector<awaitables> coros;
// Add these all in a loop (no loop in this example)
coros.push_back( do_work( 1, now + 1s ) );
coros.push_back( do_work( 2, now + 2s ) );
coros.push_back( do_work( 3, now + 3s ) );
// Await them all and let them execute in parallel
auto all_coros = parallel_coros_group( coros );
co_await (
// Wrap in a timer canceling function
execute_coro( all_coros ) &&
// Schedule a deadline
schedule_deadline( now + 5s )
);
}
catch( const std::exception &ex )
{
std::cout << "Exception: " << ex.what() << std::endl;
}
}
int main()
{
boost::asio::thread_pool ioc(1);
co_spawn(
ioc,
test(),
boost::asio::detached
);
ioc.join();
}
这样的事情可能吗?如何做到?
您可以使用范围并行组:
asio::awaitable<void> scenario(unsigned n) try {
auto ex = co_await asio::this_coro::executor;
using Task = decltype(co_spawn(ex, do_work(1, 1s), asio::deferred));
std::vector<Task> tasks;
for (unsigned i = 1; i <= n; ++i)
tasks.push_back(co_spawn(ex, do_work(i, i * 1s), asio::deferred));
// Await them all and let them execute in parallel
auto grp = asio::experimental::make_parallel_group(std::move(tasks));
auto rr = co_await ( //
grp.async_wait(asio::experimental::wait_for_all(), asio::use_awaitable) ||
asio::steady_timer(ex, 5s).async_wait(asio::use_awaitable) //
);
if (rr.index() == 0) {
std::cout << "All tasks finished!" << std::endl;
} else {
std::cout << "Timeout!" << std::endl;
}
} catch (std::exception const& ex) {
std::cout << "Exception: " << ex.what() << std::endl;
}
观看包含两种场景的现场演示:
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
asio::awaitable<void> do_work(int index, std::chrono::steady_clock::duration d) {
std::cout << "Starting work." << std::endl;
co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait(asio::deferred);
std::cout << "Work " << index << " finished!" << std::endl;
}
asio::awaitable<void> scenario(unsigned n) try {
auto ex = co_await asio::this_coro::executor;
using Task = decltype(co_spawn(ex, do_work(1, 1s), asio::deferred));
std::vector<Task> tasks;
for (unsigned i = 1; i <= n; ++i)
tasks.push_back(co_spawn(ex, do_work(i, i * 1s), asio::deferred));
// Await them all and let them execute in parallel
auto grp = asio::experimental::make_parallel_group(std::move(tasks));
auto rr = co_await ( //
grp.async_wait(asio::experimental::wait_for_all(), asio::use_awaitable) ||
asio::steady_timer(ex, 5s).async_wait(asio::use_awaitable) //
);
if (rr.index() == 0) {
std::cout << "All tasks finished!" << std::endl;
} else {
std::cout << "Timeout!" << std::endl;
}
} catch (std::exception const& ex) {
std::cout << "Exception: " << ex.what() << std::endl;
}
asio::awaitable<void> test() {
std::cout << "Scenario 1." << std::endl;
co_await scenario(3);
std::cout << "Scenario 2." << std::endl;
co_await scenario(6);
}
int main() {
asio::thread_pool ioc(1);
co_spawn(ioc, test(), asio::detached);
ioc.join();
}
印刷:
Scenario 1.
Starting work.
Starting work.
Starting work.
Work 1 finished!
Work 2 finished!
Work 3 finished!
All tasks finished!
Scenario 2.
Starting work.
Starting work.
Starting work.
Starting work.
Starting work.
Starting work.
Work 1 finished!
Work 2 finished!
Work 3 finished!
Work 4 finished!
Work 5 finished!
Timeout!