我有一个基于 boost::asio 和 Beast 并支持协程的程序。
程序通过 Beast Websocket 连接到远程服务器,并通过 Beast Websocket 发送请求,请求由其内部逻辑触发。
所以基本上,请求非常频繁且繁重。
我用
co_spawn(strand_, sendRequest(data), asio::detached)
。 sendRequest
是一个协程,它将数据发送到远程服务器,如下所示:
ws_->async_write(strand_, asio::buffer(data), asio::use_awaitable)
两个地方的
strand_
是同一个,ws_
也是用strand初始化的:
class Adapter {
Adapter(asio::strand<boost::asio::io_context::executor_type>&strand)
, strand_(strand)
, ssl_context_(asio::ssl::context::tlsv12_client)
, ws_(new websocket::stream<beast::ssl_stream<beast::tcp_stream>>(strand, ssl_context_)) {}
void handleRequest(std::string& data) {
// do some checks and have a new_data
co_spawn(strand_, sendRequest(new_data), boost::asio::detached);
}
asio::awaitable<void> sendRequest(const std::string& data) {
// do some conversion to have a new_data
co_await ws_->async_write(strand_, asio::buffer(new_data), asio::use_awaitable);
co_return;
}
protected:
asio::strand<boost::asio::io_context::executor_type>&strand_;
asio::ssl::context ssl_context_;
std::unique_ptr<websocket::stream<beast::ssl_stream<beast::tcp_stream>>> ws_;
}
不过,handleReques
可以非常快地触发。
但是如果请求太频繁,我就会崩溃。它发生在野兽中
soft_mutex.hpp
:
try_lock(T const*)
{
// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);
if(id_ != 0)
return false;
id_ = T::id;
return true;
}
如果我将
co_spawn(strand_, sendRequest(data), asio::detached)
替换为这样的空协程:
co_spawn(strand_, []() -> asio::awaitable<void> {co_return;}, asio::detached)
没关系。所以它肯定发生在
ws_->async_write
中的 sendRequest
。
我尝试用
asio::post
来包裹co_spawn
,但还是不行。
我在带有 Boost 1.85 的 Ubuntu 22.04 上执行此操作。
注意:如果我使用
ws_->write
,它就有效。
您正在启动多次写入。即使启动发生在链上,您也没有任何措施来保护任何先前的写入操作之前已完成。
无法展示如何修复代码,因为没有显示任何内容。一般来说,解决方案将涉及一个队列和一个写入循环,该循环从队列中发送消息,直到队列为空。
您可以看到我的许多 ASIO 示例,其中包含
std::deque<...>
,我通常将其命名为 outbox_
或类似名称。
问题中的代码不完整。这是一个独立的草图,展示了正确的解决方案的样子。请注意,您必须维持
a
的生命周期,直到使用它的所有(分离的)操作完成为止。
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <deque>
namespace beast = boost::beast;
namespace asio = boost::asio;
namespace websocket = boost::beast::websocket;
namespace ssl = boost::asio::ssl;
struct Adapter {
using Message = std::string;
explicit Adapter(asio::any_io_executor ex) : ws_{ex, ssl_context_} {}
void handleRequest(std::string msg) {
// do some checks and have a message
co_spawn(ws_.get_executor(), sendRequest(std::move(msg)), boost::asio::detached);
}
protected:
asio::awaitable<void> sendRequest(std::string message) {
queue_.push_back(std::move(message));
if (queue_.size() == 1)
co_await writeLoop(); // only one write loop can be active
}
asio::awaitable<void> writeLoop() {
while (!queue_.empty()) {
co_await ws_.async_write(asio::buffer(queue_.front()), asio::deferred);
queue_.pop_front();
}
}
private:
ssl::context ssl_context_{ssl::context::tlsv12_client};
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
std::deque<std::string> queue_;
};
int main() {
asio::thread_pool ioc;
Adapter a{make_strand(ioc)};
ioc.join();
}