我的程序在尝试使用 boost asio 通道将数据从生产者发送到消费者时阻塞。
async_send 方法不是异步的。文档说:此方法异步发送数据。
我尝试使用 boost asio 通道在线程之间发送数据来实现观察者设计模式。
但是我有点惊讶 async_send 的行为。
struct Subject
{
using Channel = asio::experimental::concurrent_channel<void(
std::error_code, std::shared_ptr<Response>)>;
std::list<Channel> channels;
};
asio::awaitable<void> for_each_channel(Subject& subject, auto action)
{
for(auto it = subject.channels.begin();
it != subject.channels.end();)
{
if(it->is_open())
{
co_await action(*it);
++it;
} else
{
it = subject.channels.erase(it);
}
}
}
asio::awaitable<void> notify_all(
Subject& subject, std::shared_ptr<Response> response)
{
co_await for_each_channel(
subject,
[&](Subject::Channel& channel)
{
return channel.async_send(
std::error_code{},
response,
asio::use_awaitable); // blocks here
});
}
asio::awaitable<void> close(Subject& subject)
{
co_await for_each_channel(
subject,
[&](Subject::Channel& channel)
{
return channel.async_send(
std::error_code{asio::error::operation_aborted},
nullptr,
asio::use_awaitable);
});
}
auto& add_observer(Subject& subject, auto executor)
{
return subject.channels.emplace_back(executor);
}
void remove_observer(Subject::Channel& observer)
{
observer.close();
}
asio::awaitable<void> producer(Subject& subject)
{
for(;;)
{
auto data = std::make_shared<Response>();
co_await notify_all(subject, std::move(data));
}
co_await close(subject);
}
asio::awaitable<void> consumer(Subject& subject)
{
bool ok{true};
auto& observer =
add_observer(subject, co_await asio::this_coro::executor);
while(ok)
{
auto const [ec, response] = co_await observer.async_receive(
asio::as_tuple(asio::use_awaitable));
if(ec)
{
break;
}
co_await treatment(); // treat the response
}
我的问题是为什么 async_send 不是异步的。
如何避免阻塞生产者线程?
是否有比 boost 文档更有用/有用的 boost asio 频道文档。