Boost.Beast WebSocket 服务器 - 客户端连接上出现“读取错误:I/O 操作已中止”

问题描述 投票:0回答:1

我正在使用带有 SSL 的 Boost.Asio 和 Boost.Beast 构建 WebSocket 服务器。服务器接受连接,但我遇到了客户端在连接后不久就断开连接的问题。我收到的错误消息是:

Read error: The I/O operation has been aborted because of either a thread exit or an application request (Code: 995)

这是我的设置的详细说明:

  1. 服务器使用
    async_accept
    接受新的客户端连接,其中每个连接都包装在
    SslWebSocket
    实例中。
  2. A
    ConnectionManager
    跟踪连接的客户端。
  3. 我使用
    async_read
    来读取传入消息并适当地路由它们。
  4. 还有一个 ping 机制来保持连接处于活动状态。

服务器按顺序记录以下消息:

  • “正在启动 WebSocket 服务器...”
  • “正在等待连接...”
  • “连接已接受。”
  • “正在尝试添加客户端...”
  • “客户端添加成功。”
  • 之后,我收到上面提到的读取错误。

代码示例:

以下是

do_accept
start_read
main
方法的摘录:

int main(int argc, char** argv) {
    // Create io_context and SSL context
    boost::asio::io_context io_context;
    asio::ssl::context ssl_ctx(asio::ssl::context::sslv23);
    ssl_ctx.set_verify_mode(asio::ssl::verify_none); // Disable SSL verification for testing

    // Define the server endpoint (IP and port)
    tcp::endpoint endpoint(tcp::v4(), 8080);
    auto logger = std::make_shared<FileLogger>(io_context, "server.log"); // Assuming you have a logger class

    // Create the WebSocket server
    auto server = std::make_shared<WebSocketServer>(io_context, ssl_ctx, endpoint, logger);

    // Start the server in a separate thread
    std::thread server_thread([server]() {
        server->start(); // Assuming you have a start method in WebSocketServer
        });

    // Run the io_context in the main thread
    io_context.run();

    // Optionally join the server thread if you want to wait for it
    server_thread.join();

    return 0; // Exit the program when the server is stopped (if ever)
}

void WebSocketServer::start() {
    std::cout << "Starting WebSocket server..." << std::endl;

    try {
        // Start listening for incoming connections
        acceptor_.listen();
        do_accept();  // Begin accepting connections
    }
    catch (const std::exception& e) {
        std::cerr << "Error starting server: " << e.what() << std::endl;
    }
}
void WebSocketServer::do_accept() {
    std::cout << "Waiting for connections..." << std::endl;
    acceptor_.listen();

    // Use a shared pointer to manage the resolver's lifetime
    auto resolver = std::make_shared<tcp::resolver>(acceptor_.get_executor());

    acceptor_.async_accept(asio::bind_executor(strand_, [this, resolver](beast::error_code ec, tcp::socket socket) {
        if (!ec) {
            auto ws = std::make_shared<SslWebSocket>(std::move(socket), ssl_ctx_);
            std::cout << "Connection accepted." << std::endl;

            // Attempting to add client immediately
            std::cout << "Attempting to add client..." << std::endl;

            // Add the connection to the manager
            connection_manager_->add_connection(ws);

            // Use a separate post to ensure thread safety
            asio::post(strand_, [this, ws]() {
                // Check if the connection was added successfully
                if (connection_manager_->is_connected(ws)) {
                    std::cout << "Client added successfully." << std::endl;

                    // Start the ping timer
                    asio::steady_timer ping_timer(ws->get_executor(), std::chrono::seconds(30));
                    start_ping(ws, ping_timer, std::chrono::seconds(30));

                    // Start reading messages from the client
                    start_read(ws); // Start reading messages from the client
                } else {
                    std::cout << "Failed to add client." << std::endl;
                    // Close the WebSocket if it was not added
                    ws->async_close(beast::websocket::close_code::normal, [](beast::error_code ec) {
                        if (ec) {
                            std::cerr << "Error closing WebSocket: " << ec.message() << std::endl;
                        }
                    });
                }
            });
        } else {
            std::cerr << "Error on accept: " << ec.message() << std::endl;
        }

        // Continue accepting new connections
        do_accept();  // This should be called after handling the current connection
    }));
}

void WebSocketServer::start_read(std::shared_ptr<SslWebSocket> ws) {
    auto buffer = std::make_shared<beast::flat_buffer>(); // Use a unique buffer for this read
    ws->async_read(*buffer, [this, ws, buffer](beast::error_code ec, std::size_t bytes_transferred) {
        if (ec) {
            std::cout << "Read error: " << ec.message() << " (Code: " << ec.value() << ")" << std::endl;
            connection_manager_->remove_connection(ws);
            return;
        }

        // Process the received message
        std::string received_message(beast::buffers_to_string(buffer->data()));
        buffer->consume(bytes_transferred); // Clear the buffer after reading

        // Here, you can handle the received message (e.g., route it)
        Message msg = Message::fromJson(received_message); // Assuming the message is a JSON string
        route_message(msg, ws); // Route the message to the appropriate handler

        // Continue reading if the connection is still valid
        if (connection_manager_->is_connected(ws)) {
            start_read(ws); // Continue reading if connection is still valid
        }
        else {
            std::cout << "Client has disconnected. Stopping reads." << std::endl;
        }
        });
}

问题:

  1. 什么可能导致 Boost.Beast WebSocket 服务器中出现此
    995
    错误代码?
  2. 是否有特定的线程或链相关的配置可以帮助解决此问题?
  • 确保
    ConnectionManager
    成功添加并跟踪每个客户。
  • 启动乒乓机制以保持连接处于活动状态。
  • 确认 SSL 上下文选项配置正确。
  • 已验证服务器是否正确侦听指定端点。
c++ multithreading boost boost-asio boost-beast
1个回答
0
投票

我从第一个问题代码开始就使它独立了。添加

Tracer
显示 websocket 被破坏的位置:

住在Coliru

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <iostream>
namespace asio  = boost::asio;
namespace beast = boost::beast;
using asio::ip::tcp;

template <typename T>
struct Tracing : T {
    using T::T;
    using T::operator=;

    ~Tracing() { std::cout << __PRETTY_FUNCTION__ << std::endl; }
};

using SslWebSocket = Tracing<boost::beast::websocket::stream<boost::beast::ssl_stream<tcp::socket>>>;

struct Message {
    std::string type;
    std::string data;

    static Message fromJson(std::string const& /*json*/) {
        // Parse the JSON string and return a Message object
        return Message{"type", "data"};
    }
};

struct ConnectionManager {
    void add_connection(std::shared_ptr<SslWebSocket>) {}    // Add the connection to the manager
    void remove_connection(std::shared_ptr<SslWebSocket>) {} // Remove the connection from the manager
    bool is_connected(std::shared_ptr<SslWebSocket>) { return true; } // Check if the connection is still valid
};

struct FileLogger {
    FileLogger(asio::io_context&, std::string const&) {} // Open the log file
    void log(std::string const& message);
};

class WebSocketServer {
  public:
    WebSocketServer(asio::io_context& io_context, asio::ssl::context& ssl_ctx, tcp::endpoint const& endpoint,
                    std::shared_ptr<FileLogger> logger)
        : io_context_(io_context)
        , ssl_ctx_(ssl_ctx)
        , acceptor_(io_context, endpoint)
        , strand_(make_strand(io_context))
        , logger_(logger) //
    {
        connection_manager_ = std::make_shared<ConnectionManager>();
    }

    void start() { do_accept(); }      // Start accepting connections
    void stop() { acceptor_.close(); } // Stop accepting new connections

  private:
    void do_accept();
    void start_read(std::shared_ptr<SslWebSocket> ws);
    void start_ping(std::shared_ptr<SslWebSocket> ws, asio::steady_timer& timer,
                    std::chrono::seconds interval);
    void route_message(Message const& msg, std::shared_ptr<SslWebSocket> ws);

    asio::io_context&                             io_context_;
    asio::ssl::context&                           ssl_ctx_;
    tcp::acceptor                                 acceptor_;
    asio::strand<asio::io_context::executor_type> strand_;
    std::shared_ptr<FileLogger>                   logger_;
    std::shared_ptr<ConnectionManager>            connection_manager_;
};

void WebSocketServer::do_accept() {
    std::cout << "Waiting for connections..." << std::endl;
    acceptor_.listen();

    // Use a shared pointer to manage the resolver's lifetime
    auto resolver = std::make_shared<tcp::resolver>(acceptor_.get_executor());

    acceptor_.async_accept(
        asio::bind_executor(strand_, [this, resolver](beast::error_code ec, tcp::socket socket) {
            if (!ec) {
                auto ws = std::make_shared<SslWebSocket>(std::move(socket), ssl_ctx_);
                std::cout << "Connection accepted." << std::endl;

                // Attempting to add client immediately
                std::cout << "Attempting to add client..." << std::endl;

                // Add the connection to the manager
                connection_manager_->add_connection(ws);

                // Use a separate post to ensure thread safety
                asio::post(strand_, [this, ws]() {
                    // Check if the connection was added successfully
                    if (connection_manager_->is_connected(ws)) {
                        std::cout << "Client added successfully." << std::endl;

                        // Start the ping timer
                        asio::steady_timer ping_timer(ws->get_executor(), std::chrono::seconds(30));
                        start_ping(ws, ping_timer, std::chrono::seconds(30));

                        // Start reading messages from the client
                        start_read(ws); // Start reading messages from the client
                    } else {
                        std::cout << "Failed to add client." << std::endl;
                        // Close the WebSocket if it was not added
                        ws->async_close(beast::websocket::close_code::normal, [](beast::error_code ec) {
                            if (ec) {
                                std::cerr << "Error closing WebSocket: " << ec.message() << std::endl;
                            }
                        });
                    }
                });
            } else {
                std::cerr << "Error on accept: " << ec.message() << std::endl;
            }

            // Continue accepting new connections
            do_accept(); // This should be called after handling the current connection
        }));
}

void WebSocketServer::start_read(std::shared_ptr<SslWebSocket> ws) {
    auto buffer = std::make_shared<beast::flat_buffer>(); // Use a unique buffer for this read
    ws->async_read(*buffer, [this, ws, buffer](beast::error_code ec, std::size_t bytes_transferred) {
        if (ec) {
            std::cout << "Read error: " << ec.message() << " (Code: " << ec.value() << ")" << std::endl;
            connection_manager_->remove_connection(ws);
            return;
        }

        // Process the received message
        std::string received_message(beast::buffers_to_string(buffer->data()));
        buffer->consume(bytes_transferred); // Clear the buffer after reading

        // Here, you can handle the received message (e.g., route it)
        Message msg = Message::fromJson(received_message); // Assuming the message is a JSON string
        route_message(msg, ws);                            // Route the message to the appropriate handler

        // Continue reading if the connection is still valid
        if (connection_manager_->is_connected(ws)) {
            start_read(ws); // Continue reading if connection is still valid
        } else {
            std::cout << "Client has disconnected. Stopping reads." << std::endl;
        }
    });
}

int main() {
    // Create io_context and SSL context
    boost::asio::io_context io_context;
    asio::ssl::context      ssl_ctx(asio::ssl::context::sslv23);
    ssl_ctx.set_verify_mode(asio::ssl::verify_none); // Disable SSL verification for testing

    // Define the server endpoint (IP and port)
    tcp::endpoint endpoint(tcp::v4(), 8080);
    auto logger = std::make_shared<FileLogger>(io_context, "server.log"); // Assuming you have a logger class

    // Create the WebSocket server
    auto server = std::make_shared<WebSocketServer>(io_context, ssl_ctx, endpoint, logger);

    // Start the server in a separate thread
    std::thread server_thread([server]() {
        server->start(); // Assuming you have a start method in WebSocketServer
        std::cout << "server_thread already exited" << std::endl;
    });

    // Run the io_context in the main thread
    io_context.run();

    // Optionally join the server thread if you want to wait for it
    server_thread.join();
}

// more stubs:

void WebSocketServer::start_ping(std::shared_ptr<SslWebSocket> ws, asio::steady_timer& timer,
                                 std::chrono::seconds interval) {
    timer.async_wait([this, ws, &timer, interval](beast::error_code ec) {
        if (!ec) {
            ws->async_ping({}, [this, ws, &timer, interval](beast::error_code ec) {
                if (ec) {
                    std::cerr << "Ping failed: " << ec.message() << std::endl;
                    connection_manager_->remove_connection(ws);
                    return;
                }

                // Restart the timer for the next ping
                start_ping(ws, timer, interval);
            });
        } else {
            std::cerr << "Ping timer error: " << ec.message() << std::endl;
            connection_manager_->remove_connection(ws);
        }
    });
}

void WebSocketServer::route_message(Message const& msg, std::shared_ptr<SslWebSocket>) {
    // Route the message based on the message type
    if (msg.type == "type1") {
        // Handle type1 message
    } else if (msg.type == "type2") {
        // Handle type2 message
    } else {
        // Unknown message type
    }
}

// FileLogger stubs
void FileLogger::log(std::string const& message) { std::cout << "Logged message: " << message << std::endl; }

样本运行的输出:

Waiting for connections...
server_thread already exited
Connection accepted.
Attempting to add client...
Waiting for connections...
Client added successfully.
Read error: Operation canceled (Code: 125)
Ping timer error: Operation canceled
Waiting for connections...
server_thread already exited
Connection accepted.
Attempting to add client...
Waiting for connections...
Client added successfully.
Read error: Operation canceled (Code: 125)
Ping timer error: Operation canceled
Tracing<boost::beast::websocket::stream<boost::beast::ssl_stream<boost::asio::basic_stream_socket<boost::asio::ip::
tcp>>>>::~Tracing() [T = boost::beast::websocket::stream<boost::beast::ssl_stream<boost::asio::basic_stream_socket<
boost::asio::ip::tcp>>>]

调试器将帮助您看到问题是

ping_timer
是局部变量,它的销毁立即取消计时器。这会导致
timer.async_wait
完成删除连接。

其他问题

还有很多其他问题。

  • 持有本地计时器对象的引用会导致 UB
  • 没有理由将服务器启动包装在线程中,因为它无论如何都是异步的
  • 将与连接/会话相关的所有内容包装在会话类型中(例如使用
    std::enable_shared_from_this
    )而不是围绕许多共享指针参数进行混洗,这是更明智的做法
  • 在连接管理器中保留共享连接会导致资源泄漏,因为它们可能不会被释放
  • 鉴于此,
    • 始终让析构函数进行删除。这样可以避免丢失点,也可以避免重复删除代码
    • 更好的是,更喜欢
      std::weak_ptr
      继续上课。然后
      ConnectionManager::is_connected
      可以使用
      weap_ptr::expired()
      来检查连接是否仍然有效
  • 由于
    ConnectionManager
    只是由
    WebSocketServer
    所有,并且不作为依赖项传入,因此共享/动态分配不会增加任何价值。

如果我不需要很快把时间花在其他地方,我可能会发布一个修复版本。

© www.soinside.com 2019 - 2024. All rights reserved.