我正在使用 C++20 协程编写消息代理。代理有一些会话(连接)。 经纪人传递的消息如下:
sender(ss) broker receiver(sr)
| | |
| message | |
|-------------->| |
| | message |
| |---------->|
| | |
发送过程由发送者会话(ss)触发。在此过程中,需要访问接收者会话(ss)。 ss 和 sr 可以在不同的线程和不同的strand上工作。如何很好地切换股线?
#include <boost/asio.hpp>
#include <iostream>
#include <syncstream>
#include <thread>
#include <cassert>
namespace as = boost::asio;
template <typename Executor>
struct session {
session(Executor exe):str_{exe} {}
void access() const {
std::osyncstream(std::cout) << std::this_thread::get_id() << " access()" << std::endl;
assert(str_.running_in_this_thread());
}
as::strand<Executor> str_;
};
int main() {
as::io_context iocs;
as::io_context iocr;
auto guards = as::make_work_guard(iocs.get_executor());
auto guardr = as::make_work_guard(iocr.get_executor());
session ss{iocs.get_executor()};
session sr{iocr.get_executor()};
co_spawn(
ss.str_, // sender's strand
[&] () -> as::awaitable<void> {
std::osyncstream(std::cout)
<< std::this_thread::get_id()
<< " sender start deliver"
<< std::endl;
// need to access to receiver's resource to deliver
#if 0
sr.access(); // assertion failed because this coroutine is
// working on sender's strand, not receiver's one
#endif
// so dispatch to the receiver's strand
co_await as::dispatch(sr.str_, as::use_awaitable);
#if 0
// B
sr.access(); // assertion failed because this coroutine is
// working on sender's strand, not receiver's one
#endif
// so dispatch to the receiver's strand
co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));
sr.access(); // working on receiver's thread.
// This is expected behavior.
guards.reset();
guardr.reset();
co_return;
},
as::detached
);
std::thread ts {
[&] {
std::osyncstream(std::cout) << std::this_thread::get_id() << " sender thread" << std::endl;
iocs.run();
}
};
std::thread tr {
[&] {
std::osyncstream(std::cout) << std::this_thread::get_id() << " receiver thread" << std::endl;
iocr.run();
}
};
ts.join();
tr.join();
}
godbolt 链接:https://godbolt.org/z/z8r6fYjj3
session
具有链str_
和access()
功能。 access()
函数期望在 str_
上调用该函数以消除资源锁定。
co_spawn
本身正在由发送者链工作。
导致断言失败。这是显而易见的,因为代码在发送者的链上工作,而不是接收者的链。
仍然导致断言失败。 我不清楚。我猜想该进程已切换到接收者的线程,但 co_await 切换回发送者的线程,因为 co_spawn 的执行器是发送者的线程。它用作默认执行器。这只是我的猜测。
它按我的预期工作。
// so dispatch to the receiver's strand
co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));
sr.access(); // working on receiver's thread.
// This is expected behavior.
即使 co_spawn 的执行者是发送者的链,co_await 之后的代码部分也在接收者的链上工作。
但是,我不确定它是否安全。我怀疑这可能是未定义行为的一种表现。
如果安全的话。我想使用这种方法。
我阅读了字符服务器的代码示例。
https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/example/cpp20/coroutines/chat_server.cpp 这很有趣。该会话有两个协程:reader() 和 writer()。 writer() 的 async_write() 是通过取消从未触发的计时器来触发的。这有点棘手。在使用队列完成 aync 过程之前,可以很好地避免连续的 async_write() 调用。
但是,就我而言,会话内部已经有排队机制。所以这个技巧对我来说太过分了。
以下方法安全吗?我是不是错过了什么重要的事情?
co_spawn(
base_strand,
coroutine_function
);
as::awaitable<void>
coroutine_function() {
// basically working on base_strand.
// ...
co_await some_async_func(..., bind_executor(strand1, as::use_awaitable));
// this part is working on strand1
// ...
co_await some_async_func(..., bind_executor(strand2, as::use_awaitable));
// this part is working on strand2
// ...
co_await some_async_func(..., as::use_awaitable);
// this part is working on base_strand
// ...
}
co_await as::dispatch(sr.str_, as::use_awaitable);
co_await as::dispatch(as::bind_executor(sr.str_, as::use_awaitable));
为什么第一个代码无法在
sr.str_
上运行?
Q1 - 是的,对于 Asio 来说是安全的。协程是隐式链,因为恢复是处理程序的线性链。您可以在任何地方运行它们,包括在线束上。
Q2 - 这是一个经常发生的混乱,例如使用 boost asio 链作为“互斥体”不适用于协程
另请参阅这些解释: