带有客户端/服务器的 Websocket boost/beast 示例

问题描述 投票:0回答:1

也许有人可以提供关于如何使用 boost/beast 库中的 websocket 设置客户端和服务器的简单示例(或参考)?我需要一个关于如何处理服务器上的输入消息并响应/不响应它的示例(这样客户端就不会崩溃),如何将一些数据发送到特定的“订阅”连接以及如何在客户端处理它。 我找到了这个例子,但它没有按预期工作: 客户端:

#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;

// Thread-safe queue class
template <typename T>
class ThreadSafeQueue {
public:
    void push(const T& value) {
        std::lock_guard<std::mutex> lock(mtx_);
        queue_.push(value);
        cond_var_.notify_one();
    }

    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lock(mtx_);
        if (queue_.empty()) {
            return false;
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }

    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lock(mtx_);
        cond_var_.wait(lock, [this] { return !queue_.empty(); });
        value = queue_.front();
        queue_.pop();
    }

private:
    std::queue<T> queue_;
    mutable std::mutex mtx_;
    std::condition_variable cond_var_;
};

class Singleton {
public:
    ThreadSafeQueue<json>* data_queue;
    websocket::stream<tcp::socket>* ws;

    Singleton(
        ThreadSafeQueue<json>* data_queue_,
        websocket::stream<tcp::socket>* ws_
    ) :
        data_queue(data_queue_),
        ws(ws_)
    {}
};

void readData(Singleton& data) {
    try {
        beast::flat_buffer buffer;
        while (true) {
            data.ws->read(buffer);
            auto received_message = beast::buffers_to_string(buffer.data());
            json received_json = json::parse(received_message);

            data.data_queue->push(received_json);
            std::cout << "Received from server: " << received_message << "\n";

            buffer.consume(buffer.size());
        }
    }
    catch (std::exception e) {
        std::cout << "Error occured in reader: " << e.what() << "\n";
    }
}

void sendData(Singleton& data) {
    try {
        std::string input;
        while (true) {
            std::cout << "To send: ";
            std::getline(std::cin, input);

            if (input == "stop") break;

            json message = {
                {input[0] == 'h' ? "echo" : "null", input}
            };

            data.ws->write(asio::buffer(message.dump()));
        }
    }
    catch (std::exception e) {
        std::cout << "Error occured in sender: " << e.what() << "\n";
    }
}

int main() {
    std::string const host = "127.0.0.1";
    std::string const port = "9002";

    asio::io_context ioc;
    tcp::resolver resolver(ioc);
    websocket::stream<tcp::socket> ws(ioc);

    auto const results = resolver.resolve(host, port);
    asio::connect(ws.next_layer(), results);

    ws.handshake(host, "/");

    ThreadSafeQueue<json> data_queue;
    Singleton single(&data_queue, &ws);

    std::thread reader(readData, std::ref(single));
    std::thread sender(sendData, std::ref(single));

    reader.join();
    sender.join();

    return 0;
}

服务器端:

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;

class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
    WebSocketSession(tcp::socket socket)
        : ws_(std::move(socket)) {}

    void run() {
        ws_.async_accept(
            beast::bind_front_handler(
                &WebSocketSession::on_accept,
                shared_from_this()
            )
        );
    }

private:
    void on_accept(beast::error_code ec) {
        if (ec) {
            std::cerr << "Accept error: " << ec.message() << std::endl;
            return;
        }
        do_read();
    }

    void do_read() {
        ws_.async_read(
            buffer_,
            beast::bind_front_handler(
                &WebSocketSession::on_read,
                shared_from_this()
            )
        );
    }

    void on_read(beast::error_code ec, std::size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec) {
            if (ec == websocket::error::closed) {
                return;
            }
            std::cerr << "Read error: " << ec.message() << std::endl;
            return;
        }

        try {
            auto received_message = beast::buffers_to_string(buffer_.data());
            json received_json = json::parse(received_message);

            std::string response_message;

            if (received_json.contains("echo")) {
                json response_json = {
                    {"type", "response"},
                    {"original", received_json}
                };
                response_message = response_json.dump();
            }
            else {
                buffer_.consume(buffer_.size());
                do_read();
                return;
            }

            response_ptr_ = std::make_shared<std::string>(std::move(response_message));
            ws_.text(ws_.got_text());
            ws_.async_write(
                asio::buffer(*response_ptr_),
                beast::bind_front_handler(
                    &WebSocketSession::on_write,
                    shared_from_this()));
        }
        catch (const std::exception& e) {
            std::cerr << "Processing error: " << e.what() << std::endl;
        }
    }

    void on_write(beast::error_code ec, std::size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec) {
            std::cerr << "Write error: " << ec.message() << std::endl;
            return;
        }

        buffer_.consume(buffer_.size());
        response_ptr_.reset();
        do_read();
    }

    websocket::stream<tcp::socket> ws_;
    beast::flat_buffer buffer_;
    std::shared_ptr<std::string> response_ptr_;
};

class WebSocketServer {
public:
    WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint)
        : acceptor_(ioc, endpoint) {
        do_accept();
    }

private:
    void do_accept() {
        acceptor_.async_accept(
            beast::bind_front_handler(
                &WebSocketServer::on_accept,
                this
            )
        );
    }

    void on_accept(beast::error_code ec, tcp::socket socket) {
        if (ec) {
            std::cerr << "Accept error: " << ec.message() << std::endl;
        }
        else {
            std::make_shared<WebSocketSession>(std::move(socket))->run();
        }
        do_accept();
    }

    tcp::acceptor acceptor_;
};

int main() {
    std::cout << "Web server is running:\n";
    try {
        asio::io_context ioc;
        tcp::endpoint endpoint(tcp::v4(), 9002);
        WebSocketServer server(ioc, endpoint);
        ioc.run();
    }
    catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}
c++ concurrency boost-beast boost-beast-websocket
1个回答
0
投票

在我看来,客户端不会崩溃,而只是正确发送一条消息https://i.imgur.com/0zrvFBP.jpeg

在 ASan/UBSan 下运行代码没有发现明显的问题(很好)。

要使客户端成为全双工,您将需要线程或异步。我建议异步,因为它适合库的选择。

从动态缓冲区中消费时要小心:不要只消费整个缓冲区,除非它是当前消息的全部部分。

您需要对传出写入进行排队,因为您不能有重叠的写入,并且显然您的接口是多线程的。

在服务器端,您通过确保在发送响应之前不读取下一个请求来使事情变得简单。我注意到

response_ptr_
是多余的,因为它已经是
shared_from_this()
对象的一部分。所以,我就做到了
std::string response_;

服务器备注:

  • 使用
    ws_.got_text()
    很奇怪,因为你绝对保证 json 文本
  • 在 1 个位置而不是两个位置使用已处理的消息
  • 我会考虑不创建字符串,因为 Nlohmann 可以从迭代器中解析
  • 确保刷新 stdio 以查看“实时活动”

我建议结合:

  • 文件

    client.cpp

     #include <boost/asio.hpp>
     #include <boost/beast.hpp>
     #include <iostream>
     #include <nlohmann/json.hpp>
     #include <queue>
    
     namespace asio      = boost::asio;
     namespace beast     = boost::beast;
     namespace websocket = beast::websocket;
     using tcp           = asio::ip::tcp;
     using json          = nlohmann::json;
    
     // Thread-safe queue class
     template <typename T> class ThreadSafeQueue {
       public:
         void push(T const& value) {
             std::lock_guard<std::mutex> lock(mtx_);
             queue_.push(value);
             cond_var_.notify_one();
         }
    
         bool try_pop(T& value) {
             std::lock_guard<std::mutex> lock(mtx_);
             if (queue_.empty()) {
                 return false;
             }
             value = queue_.front();
             queue_.pop();
             return true;
         }
    
         void wait_and_pop(T& value) {
             std::unique_lock<std::mutex> lock(mtx_);
             cond_var_.wait(lock, [this] { return !queue_.empty(); });
             value = queue_.front();
             queue_.pop();
         }
    
       private:
         std::queue<T>           queue_;
         mutable std::mutex      mtx_;
         std::condition_variable cond_var_;
     };
     using MsgQueue = ThreadSafeQueue<json>;
    
     struct  WSClient {
         WSClient(std::string const& host, std::string const& port, MsgQueue& inbox) : inbox_(inbox) {
             // connect, handshake
             connect(ws.next_layer(), tcp::resolver (ioc).resolve(host, port));
             ws.handshake(host, "/");
    
             // start async read chain
             do_read_loop();
         };
    
         void stop() {
             beast::get_lowest_layer(ws).cancel();
             ioc.join();
         }
    
         void send(json&& message) {
             asio::post(ws.get_executor(), [this, m = std::move(message)]() mutable { //
                 outbox_.push_back(std::move(m).dump());
                 if (outbox_.size() == 1)
                     do_write_loop(); // start the pump
             });
         }
    
       private:
         asio::thread_pool  ioc{1}; // single thread should suffice
         beast::flat_buffer incoming_;
         MsgQueue&          inbox_;
    
         std::deque<std::string>        outbox_; // serialized form for buffer stability
         websocket::stream<tcp::socket> ws{ioc};
    
         void do_read_loop() {
             ws.async_read(incoming_, [this](beast::error_code ec, size_t n) {
                 std::cout << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
                 if (ec)
                     return;
    
                 auto received_message = beast::buffers_to_string(incoming_.data()).substr(0, n);
                 json received_json    = json::parse(received_message);
    
                 inbox_.push(received_json);
                 std::cout << "Received from server: " << received_message << std::endl;
    
                 incoming_.consume(n);
                 do_read_loop();
             });
         }
    
         void do_write_loop() {
             if (outbox_.empty())
                 return;
    
             ws.async_write(asio::buffer(outbox_.front()), [this](beast::error_code ec, size_t n) {
                 std::cout << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
                 if (!ec) {
                     outbox_.pop_front();
                     do_write_loop();
                 }
             });
         }   
     };
    
     int main() try {
         MsgQueue received;
         WSClient client("127.0.0.1", "9002", received);
    
         std::cout << "To send: ";
         for (std::string input; std::getline(std::cin, input); std::cout << "To send: ") {
             if (input == "stop")
                 break;
             if (input.empty())
                 continue;
    
             client.send({{input.starts_with('h') ? "echo" : "null", std::move(input)}});
    
             for (json msg; received.try_pop(msg);)
                 std::cout << " - Processing queued: " << msg << std::endl;
         }
    
         client.stop();
     } catch (std::exception const& e) {
         std::cerr << "Error: " << e.what() << std::endl;
     }
    
  • 文件

    server.cpp

     #include <boost/asio.hpp>
     #include <boost/beast.hpp>
     #include <iostream>
     #include <nlohmann/json.hpp>
    
     namespace asio      = boost::asio;
     namespace beast     = boost::beast;
     namespace websocket = beast::websocket;
     using tcp           = asio::ip::tcp;
     using json          = nlohmann::json;
    
     class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
       public:
         WebSocketSession(tcp::socket socket) : ws_(std::move(socket)) {}
    
         void run() {
             ws_.async_accept(beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
         }
    
         ~WebSocketSession() { std::cerr << "Session " << peer_ << " closed" << std::endl; }
    
       private:
         void on_accept(beast::error_code ec) {
             std::cerr << "Accept: " << ec.message() << " for " << peer_ << std::endl;
             if (!ec)
                 do_read_loop();
         }
    
         void do_read_loop() {
             ws_.async_read(buffer_, beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this()));
         }
    
         void on_read(beast::error_code ec, size_t n) {
             if (ec) {
                 if (ec == websocket::error::closed) {
                     return;
                 }
                 std::cerr << "Read error: " << ec.message() << std::endl;
                 return;
             }
    
             try {
                 response_.clear();
    
                 auto it  = buffers_begin(buffer_.data());
                 json msg = json::parse(it, it + n);
                 buffer_.consume(n);
    
                 if (msg.contains("echo"))
                     response_ = json{{"type", "response"}, {"original", std::move(msg)}}.dump();
    
                 if (response_.empty()) {
                     do_read_loop();
                 } else {
                     ws_.text(true);
                     ws_.async_write(asio::buffer(response_),
                                     beast::bind_front_handler(&WebSocketSession::on_write, shared_from_this()));
                 }
             } catch (std::exception const& e) {
                 std::cerr << "Processing error: " << e.what() << std::endl;
             }
         }
    
         void on_write(beast::error_code ec, size_t n) {
             std::cerr << "Write: " << n << " bytes (" << ec.message() << ")" << std::endl;
             if (!ec)
                 do_read_loop();
         }
    
         websocket::stream<tcp::socket> ws_;
         beast::flat_buffer             buffer_;
         std::string                    response_;
         tcp::endpoint                  peer_ = ws_.next_layer().remote_endpoint();
     };
    
     class WebSocketServer {
       public:
         WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint) : acceptor_(ioc, endpoint) { do_accept(); }
    
       private:
         void do_accept() { acceptor_.async_accept(beast::bind_front_handler(&WebSocketServer::on_accept, this)); }
    
         void on_accept(beast::error_code ec, tcp::socket socket) {
             if (ec) {
                 std::cerr << "Accept error: " << ec.message() << std::endl;
             } else {
                 std::make_shared<WebSocketSession>(std::move(socket))->run();
             }
             do_accept();
         }
    
         tcp::acceptor acceptor_;
     };
    
     int main() {
         std::cout << "Web server is running:\n";
         try {
             asio::io_context ioc;
             tcp::endpoint    endpoint(tcp::v4(), 9002);
             WebSocketServer  server(ioc, endpoint);
             ioc.run();
         } catch (std::exception const& e) {
             std::cerr << "Error: " << e.what() << std::endl;
             return 1;
         }
     }
    

使用习惯演示:

© www.soinside.com 2019 - 2024. All rights reserved.