我正在使用带有 SSL 的 Boost.Asio 和 Boost.Beast 构建 WebSocket 服务器。服务器接受连接,但我遇到了客户端在连接后不久就断开连接的问题。我收到的错误消息是:
Read error: The I/O operation has been aborted because of either a thread exit or an application request (Code: 995)
这是我的设置的详细说明:
async_accept
接受新的客户端连接,其中每个连接都包装在 SslWebSocket
实例中。ConnectionManager
跟踪连接的客户端。async_read
来读取传入消息并适当地路由它们。服务器按顺序记录以下消息:
代码示例:
以下是
do_accept
、start_read
和 main
方法的摘录:
int main(int argc, char** argv) {
// Create io_context and SSL context
boost::asio::io_context io_context;
asio::ssl::context ssl_ctx(asio::ssl::context::sslv23);
ssl_ctx.set_verify_mode(asio::ssl::verify_none); // Disable SSL verification for testing
// Define the server endpoint (IP and port)
tcp::endpoint endpoint(tcp::v4(), 8080);
auto logger = std::make_shared<FileLogger>(io_context, "server.log"); // Assuming you have a logger class
// Create the WebSocket server
auto server = std::make_shared<WebSocketServer>(io_context, ssl_ctx, endpoint, logger);
// Start the server in a separate thread
std::thread server_thread([server]() {
server->start(); // Assuming you have a start method in WebSocketServer
});
// Run the io_context in the main thread
io_context.run();
// Optionally join the server thread if you want to wait for it
server_thread.join();
return 0; // Exit the program when the server is stopped (if ever)
}
void WebSocketServer::start() {
std::cout << "Starting WebSocket server..." << std::endl;
try {
// Start listening for incoming connections
acceptor_.listen();
do_accept(); // Begin accepting connections
}
catch (const std::exception& e) {
std::cerr << "Error starting server: " << e.what() << std::endl;
}
}
void WebSocketServer::do_accept() {
std::cout << "Waiting for connections..." << std::endl;
acceptor_.listen();
// Use a shared pointer to manage the resolver's lifetime
auto resolver = std::make_shared<tcp::resolver>(acceptor_.get_executor());
acceptor_.async_accept(asio::bind_executor(strand_, [this, resolver](beast::error_code ec, tcp::socket socket) {
if (!ec) {
auto ws = std::make_shared<SslWebSocket>(std::move(socket), ssl_ctx_);
std::cout << "Connection accepted." << std::endl;
// Attempting to add client immediately
std::cout << "Attempting to add client..." << std::endl;
// Add the connection to the manager
connection_manager_->add_connection(ws);
// Use a separate post to ensure thread safety
asio::post(strand_, [this, ws]() {
// Check if the connection was added successfully
if (connection_manager_->is_connected(ws)) {
std::cout << "Client added successfully." << std::endl;
// Start the ping timer
asio::steady_timer ping_timer(ws->get_executor(), std::chrono::seconds(30));
start_ping(ws, ping_timer, std::chrono::seconds(30));
// Start reading messages from the client
start_read(ws); // Start reading messages from the client
} else {
std::cout << "Failed to add client." << std::endl;
// Close the WebSocket if it was not added
ws->async_close(beast::websocket::close_code::normal, [](beast::error_code ec) {
if (ec) {
std::cerr << "Error closing WebSocket: " << ec.message() << std::endl;
}
});
}
});
} else {
std::cerr << "Error on accept: " << ec.message() << std::endl;
}
// Continue accepting new connections
do_accept(); // This should be called after handling the current connection
}));
}
void WebSocketServer::start_read(std::shared_ptr<SslWebSocket> ws) {
auto buffer = std::make_shared<beast::flat_buffer>(); // Use a unique buffer for this read
ws->async_read(*buffer, [this, ws, buffer](beast::error_code ec, std::size_t bytes_transferred) {
if (ec) {
std::cout << "Read error: " << ec.message() << " (Code: " << ec.value() << ")" << std::endl;
connection_manager_->remove_connection(ws);
return;
}
// Process the received message
std::string received_message(beast::buffers_to_string(buffer->data()));
buffer->consume(bytes_transferred); // Clear the buffer after reading
// Here, you can handle the received message (e.g., route it)
Message msg = Message::fromJson(received_message); // Assuming the message is a JSON string
route_message(msg, ws); // Route the message to the appropriate handler
// Continue reading if the connection is still valid
if (connection_manager_->is_connected(ws)) {
start_read(ws); // Continue reading if connection is still valid
}
else {
std::cout << "Client has disconnected. Stopping reads." << std::endl;
}
});
}
问题:
995
错误代码?ConnectionManager
成功添加并跟踪每个客户。我从第一个问题代码开始就使它独立了。添加
Tracer
显示 websocket 被破坏的位置:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace beast = boost::beast;
using asio::ip::tcp;
template <typename T>
struct Tracing : T {
using T::T;
using T::operator=;
~Tracing() { std::cout << __PRETTY_FUNCTION__ << std::endl; }
};
using SslWebSocket = Tracing<boost::beast::websocket::stream<boost::beast::ssl_stream<tcp::socket>>>;
struct Message {
std::string type;
std::string data;
static Message fromJson(std::string const& /*json*/) {
// Parse the JSON string and return a Message object
return Message{"type", "data"};
}
};
struct ConnectionManager {
void add_connection(std::shared_ptr<SslWebSocket>) {} // Add the connection to the manager
void remove_connection(std::shared_ptr<SslWebSocket>) {} // Remove the connection from the manager
bool is_connected(std::shared_ptr<SslWebSocket>) { return true; } // Check if the connection is still valid
};
struct FileLogger {
FileLogger(asio::io_context&, std::string const&) {} // Open the log file
void log(std::string const& message);
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& io_context, asio::ssl::context& ssl_ctx, tcp::endpoint const& endpoint,
std::shared_ptr<FileLogger> logger)
: io_context_(io_context)
, ssl_ctx_(ssl_ctx)
, acceptor_(io_context, endpoint)
, strand_(make_strand(io_context))
, logger_(logger) //
{
connection_manager_ = std::make_shared<ConnectionManager>();
}
void start() { do_accept(); } // Start accepting connections
void stop() { acceptor_.close(); } // Stop accepting new connections
private:
void do_accept();
void start_read(std::shared_ptr<SslWebSocket> ws);
void start_ping(std::shared_ptr<SslWebSocket> ws, asio::steady_timer& timer,
std::chrono::seconds interval);
void route_message(Message const& msg, std::shared_ptr<SslWebSocket> ws);
asio::io_context& io_context_;
asio::ssl::context& ssl_ctx_;
tcp::acceptor acceptor_;
asio::strand<asio::io_context::executor_type> strand_;
std::shared_ptr<FileLogger> logger_;
std::shared_ptr<ConnectionManager> connection_manager_;
};
void WebSocketServer::do_accept() {
std::cout << "Waiting for connections..." << std::endl;
acceptor_.listen();
// Use a shared pointer to manage the resolver's lifetime
auto resolver = std::make_shared<tcp::resolver>(acceptor_.get_executor());
acceptor_.async_accept(
asio::bind_executor(strand_, [this, resolver](beast::error_code ec, tcp::socket socket) {
if (!ec) {
auto ws = std::make_shared<SslWebSocket>(std::move(socket), ssl_ctx_);
std::cout << "Connection accepted." << std::endl;
// Attempting to add client immediately
std::cout << "Attempting to add client..." << std::endl;
// Add the connection to the manager
connection_manager_->add_connection(ws);
// Use a separate post to ensure thread safety
asio::post(strand_, [this, ws]() {
// Check if the connection was added successfully
if (connection_manager_->is_connected(ws)) {
std::cout << "Client added successfully." << std::endl;
// Start the ping timer
asio::steady_timer ping_timer(ws->get_executor(), std::chrono::seconds(30));
start_ping(ws, ping_timer, std::chrono::seconds(30));
// Start reading messages from the client
start_read(ws); // Start reading messages from the client
} else {
std::cout << "Failed to add client." << std::endl;
// Close the WebSocket if it was not added
ws->async_close(beast::websocket::close_code::normal, [](beast::error_code ec) {
if (ec) {
std::cerr << "Error closing WebSocket: " << ec.message() << std::endl;
}
});
}
});
} else {
std::cerr << "Error on accept: " << ec.message() << std::endl;
}
// Continue accepting new connections
do_accept(); // This should be called after handling the current connection
}));
}
void WebSocketServer::start_read(std::shared_ptr<SslWebSocket> ws) {
auto buffer = std::make_shared<beast::flat_buffer>(); // Use a unique buffer for this read
ws->async_read(*buffer, [this, ws, buffer](beast::error_code ec, std::size_t bytes_transferred) {
if (ec) {
std::cout << "Read error: " << ec.message() << " (Code: " << ec.value() << ")" << std::endl;
connection_manager_->remove_connection(ws);
return;
}
// Process the received message
std::string received_message(beast::buffers_to_string(buffer->data()));
buffer->consume(bytes_transferred); // Clear the buffer after reading
// Here, you can handle the received message (e.g., route it)
Message msg = Message::fromJson(received_message); // Assuming the message is a JSON string
route_message(msg, ws); // Route the message to the appropriate handler
// Continue reading if the connection is still valid
if (connection_manager_->is_connected(ws)) {
start_read(ws); // Continue reading if connection is still valid
} else {
std::cout << "Client has disconnected. Stopping reads." << std::endl;
}
});
}
int main() {
// Create io_context and SSL context
boost::asio::io_context io_context;
asio::ssl::context ssl_ctx(asio::ssl::context::sslv23);
ssl_ctx.set_verify_mode(asio::ssl::verify_none); // Disable SSL verification for testing
// Define the server endpoint (IP and port)
tcp::endpoint endpoint(tcp::v4(), 8080);
auto logger = std::make_shared<FileLogger>(io_context, "server.log"); // Assuming you have a logger class
// Create the WebSocket server
auto server = std::make_shared<WebSocketServer>(io_context, ssl_ctx, endpoint, logger);
// Start the server in a separate thread
std::thread server_thread([server]() {
server->start(); // Assuming you have a start method in WebSocketServer
std::cout << "server_thread already exited" << std::endl;
});
// Run the io_context in the main thread
io_context.run();
// Optionally join the server thread if you want to wait for it
server_thread.join();
}
// more stubs:
void WebSocketServer::start_ping(std::shared_ptr<SslWebSocket> ws, asio::steady_timer& timer,
std::chrono::seconds interval) {
timer.async_wait([this, ws, &timer, interval](beast::error_code ec) {
if (!ec) {
ws->async_ping({}, [this, ws, &timer, interval](beast::error_code ec) {
if (ec) {
std::cerr << "Ping failed: " << ec.message() << std::endl;
connection_manager_->remove_connection(ws);
return;
}
// Restart the timer for the next ping
start_ping(ws, timer, interval);
});
} else {
std::cerr << "Ping timer error: " << ec.message() << std::endl;
connection_manager_->remove_connection(ws);
}
});
}
void WebSocketServer::route_message(Message const& msg, std::shared_ptr<SslWebSocket>) {
// Route the message based on the message type
if (msg.type == "type1") {
// Handle type1 message
} else if (msg.type == "type2") {
// Handle type2 message
} else {
// Unknown message type
}
}
// FileLogger stubs
void FileLogger::log(std::string const& message) { std::cout << "Logged message: " << message << std::endl; }
样本运行的输出:
Waiting for connections...
server_thread already exited
Connection accepted.
Attempting to add client...
Waiting for connections...
Client added successfully.
Read error: Operation canceled (Code: 125)
Ping timer error: Operation canceled
Waiting for connections...
server_thread already exited
Connection accepted.
Attempting to add client...
Waiting for connections...
Client added successfully.
Read error: Operation canceled (Code: 125)
Ping timer error: Operation canceled
Tracing<boost::beast::websocket::stream<boost::beast::ssl_stream<boost::asio::basic_stream_socket<boost::asio::ip::
tcp>>>>::~Tracing() [T = boost::beast::websocket::stream<boost::beast::ssl_stream<boost::asio::basic_stream_socket<
boost::asio::ip::tcp>>>]
调试器将帮助您看到问题是
ping_timer
是局部变量,它的销毁立即取消计时器。这会导致 timer.async_wait
完成删除连接。
还有很多其他问题。
std::enable_shared_from_this
)而不是围绕许多共享指针参数进行混洗,这是更明智的做法std::weak_ptr
继续上课。然后ConnectionManager::is_connected
可以使用weap_ptr::expired()
来检查连接是否仍然有效ConnectionManager
只是由 WebSocketServer
所有,并且不作为依赖项传入,因此共享/动态分配不会增加任何价值。如果我不需要很快把时间花在其他地方,我可能会发布一个修复版本。