我正在尝试将 boost 单线程应用程序扩展到多线程应用程序。该代码基于这篇文章。我采取的方法是为每个线程提供专用的 IO_Context。在我的应用程序中,我需要将 ASIO JOB 从其他线程发布到另一个线程的 IO_Context。根据 boost article,这应该可以工作,但我的应用程序在几次迭代后因分段错误而崩溃。
#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;
static std::atomic_int tid_gen = 0;
thread_local int const tid = [] { return ++tid_gen; }();
static constexpr auto now = std::chrono::steady_clock::now;
static auto const start = now();
static std::mutex console_mx;
void trace(auto const&... msg) {
std::lock_guard lk(console_mx);
std::cerr << "at " << std::setw(8) << (now() - start)/1ms << "ms - tid:" << tid << " ";
(std::cerr << ... << msg) << std::endl;
}
void worker(asio::io_context& ioContext) {
trace("Worker thread enter");
ioContext.run(); // Run the io_context to handle asynchronous operations
trace("Worker thread exit");
}
int main() {
try {
asio::io_context ioContext1;
asio::io_context ioContext2;
std::function<void(const boost::system::error_code& error)> handler1;
std::function<void(const boost::system::error_code& error)> handler2;
asio::steady_timer task1(ioContext1, 100ms);
asio::steady_timer task2(ioContext2, 200ms);
handler1 = [&task1, &handler1, &ioContext2](error_code ec) {
//trace("Start Task1: ", ec.message());
if (!ec)
usleep(5000);
asio::post(ioContext2, []{
trace("Task1 posted job on Task2");
});
task1.async_wait(handler1);
};
task1.async_wait(handler1);
handler2 = [&task2, &handler2, &ioContext1](error_code ec) {
//trace("Start Task2: ", ec.message());
if (!ec)
usleep(10000);
asio::post(ioContext1, []{
trace("Task2 posted job on Task1");
});
task2.async_wait(handler2);
};
task2.async_wait(handler2);
// Create a work object to prevent ioContext.run() from returning immediately
auto work1 = make_work_guard(ioContext1);
auto work2 = make_work_guard(ioContext2);
// Create multiple worker threads
std::vector<std::thread> threads;
threads.emplace_back(worker, std::ref(ioContext1));
threads.emplace_back(worker, std::ref(ioContext2));
trace("App started :", std::thread::hardware_concurrency());
work1.reset();
work2.reset();
// Join the worker threads
for (auto& thread : threads) {
thread.join();
}
trace("All worker threads joined.");
} catch (std::exception const& e) {
trace("Exception: ", std::quoted(e.what()));
}
}
这是回溯轨迹
Thread 3 "application" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x763de380 (LWP 29973)]
boost::asio::detail::scheduler::do_run_one (this=this@entry=0x42d188, lock=..., this_thread=..., ec=...) at /usr/include/boost/asio/detail/impl/scheduler.ipp:458
有谁知道这种方法是否应该有效或预计会失败? 我尝试在发布职位之前添加单独的锁,但这也没有帮助。
任何其他在 IO_Context 之间发布作业的建议也将非常有用。 我在嵌入式 Linux 5.15 平台上使用 boost 版本 1.79,该平台在双核 Cortex A7 硬件上运行。
我没有发现发布的代码有问题。
我注意到一些问题:计时器...没用,因为所有 async_wait 都会立即完成(第一个除外)。
工作警卫没有增加任何价值(因为总是有工作)。
我将其简化为这个。
您还缺少
函数中的异常处理(应该捕获 boost::asio::io_service::run() 抛出的异常吗?)。我将把它作为练习留给读者。worker
唯一引用timer1的代码在loop1中。 唯一引用timer2的代码在loop2中。
loop1/loop2 是隐式链(它们仅从自己的完成处理程序连续执行链接)。
ioc1
和ioc2
是共享的,但是记录的线程安全(假设有足够的生命周期,即除了构造/销毁),还有一些与这里不相关的例外:
共享对象:安全,但
和restart()
函数除外。在有未完成的 run()、notify_fork()
、run_one()
、run_for()
、run_until()
或poll()
调用时调用 restart() 会导致未定义的行为。在另一个线程中调用任何poll_one()
函数或与notify_fork()
关联的 I/O 对象上的任何函数时,不应调用io_context
函数。io_context
如果您希望异步计时器定期重新安排,我之前已经做了一些示例来封装该逻辑,这样您就不需要重复这么多代码。
此外,它应该将计时器与执行上下文分离,因为如果需要的话,在单个上下文/线程上运行多个计时器是完全有效的。当然,如果任务占用大量 CPU,您不应该从(同一)服务线程调用它们。
请参阅 Asio async_wait 给出操作已取消以获取完整说明。
为了弥补异常处理的缺乏以及许多(冗余)上下文的杂乱,我建议使用共享上下文。现在,您仍然可以通过使用链确保所有处理程序在特定“任务”上“序列化”。
当您说“所以我想要对锁做的是,在发布作业之前获取 io_context 独有的锁,但这没有帮助” . 它没有帮助的原因是
你想要序列化处理程序
调用
- ,而不是排队的时间 您无法对异步调用持有锁,因为它破坏了执行程序抽象:执行程序可能会导致在不同线程上调用处理程序,而不会持有锁。理论上,您可以在仅移动处理程序对象中保留一个
unique_lock
- ,但我认为 99% 的情况下这是一种强烈的代码味道(也是导致死锁的原因)
住在Coliru
static constexpr auto unit = 1000ms;
asio::thread_pool ioc(2); // or leave the argument to get a system-dependent default size
std::function<void(error_code error)> loop1, loop2;
auto make_loop = [](auto& loop, int id, auto ex, auto otherEx) {
auto interval = id * unit;
loop = [=, &loop, t = std::make_shared<asio::steady_timer>(ex, interval)](error_code ec) {
trace("Start Task", id, ": ", ec.message());
if (!ec)
sleep_for(interval / 2);
asio::post(otherEx, [id] { trace("job cross-posted from Task", id); });
t->expires_from_now(interval);
t->async_wait(loop);
};
asio::post(ex, bind(loop, error_code{})); // prime the pump
};
auto task1 = make_strand(ioc);
auto task2 = make_strand(ioc);
make_loop(loop1, 1, task1, task2);
make_loop(loop2, 2, task2, task1);
trace("App started :", std::thread::hardware_concurrency());
ioc.join();
印刷
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -DCOLIRU && ./a.out
at 0ms - tid:0 main
at 0ms - tid:1 Start Task1: Success
at 0ms - tid:0 App started :4
at 0ms - tid:2 Start Task2: Success
at 1000ms - tid:1 job cross-posted from Task2
at 1000ms - tid:2 job cross-posted from Task1
at 1500ms - tid:1 Start Task1: Success
at 2001ms - tid:1 job cross-posted from Task1
at 3000ms - tid:2 Start Task2: Success
at 3001ms - tid:1 Start Task1: Success
at 4001ms - tid:2 job cross-posted from Task2
at 4001ms - tid:1 job cross-posted from Task1
at 4501ms - tid:2 Start Task1: Success
at 5001ms - tid:2 job cross-posted from Task1
at 6001ms - tid:1 Start Task2: Success
at 6001ms - tid:2 Start Task1: Success
at 7001ms - tid:1 job cross-posted from Task2
at 7001ms - tid:2 job cross-posted from Task1
at 7502ms - tid:1 Start Task1: Success
at 8002ms - tid:1 job cross-posted from Task1
at 9001ms - tid:2 Start Task2: Success
at 9002ms - tid:1 Start Task1: Success
at 10001ms - tid:0 All worker threads joined.
请注意,这实际上比您最初的重现少了 28 行代码 - 33% 吸脂率。
总结/结论