也许有人可以提供关于如何使用 boost/beast 库中的 websocket 设置客户端和服务器的简单示例(或参考)?我需要一个关于如何处理服务器上的输入消息并响应/不响应它的示例(这样客户端就不会崩溃),如何将一些数据发送到特定的“订阅”连接以及如何在客户端处理它。 我找到了这个例子,但它没有按预期工作: 客户端:
#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
// Thread-safe queue class
template <typename T>
class ThreadSafeQueue {
public:
void push(const T& value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
cond_var_.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
private:
std::queue<T> queue_;
mutable std::mutex mtx_;
std::condition_variable cond_var_;
};
class Singleton {
public:
ThreadSafeQueue<json>* data_queue;
websocket::stream<tcp::socket>* ws;
Singleton(
ThreadSafeQueue<json>* data_queue_,
websocket::stream<tcp::socket>* ws_
) :
data_queue(data_queue_),
ws(ws_)
{}
};
void readData(Singleton& data) {
try {
beast::flat_buffer buffer;
while (true) {
data.ws->read(buffer);
auto received_message = beast::buffers_to_string(buffer.data());
json received_json = json::parse(received_message);
data.data_queue->push(received_json);
std::cout << "Received from server: " << received_message << "\n";
buffer.consume(buffer.size());
}
}
catch (std::exception e) {
std::cout << "Error occured in reader: " << e.what() << "\n";
}
}
void sendData(Singleton& data) {
try {
std::string input;
while (true) {
std::cout << "To send: ";
std::getline(std::cin, input);
if (input == "stop") break;
json message = {
{input[0] == 'h' ? "echo" : "null", input}
};
data.ws->write(asio::buffer(message.dump()));
}
}
catch (std::exception e) {
std::cout << "Error occured in sender: " << e.what() << "\n";
}
}
int main() {
std::string const host = "127.0.0.1";
std::string const port = "9002";
asio::io_context ioc;
tcp::resolver resolver(ioc);
websocket::stream<tcp::socket> ws(ioc);
auto const results = resolver.resolve(host, port);
asio::connect(ws.next_layer(), results);
ws.handshake(host, "/");
ThreadSafeQueue<json> data_queue;
Singleton single(&data_queue, &ws);
std::thread reader(readData, std::ref(single));
std::thread sender(sendData, std::ref(single));
reader.join();
sender.join();
return 0;
}
服务器端:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
WebSocketSession(tcp::socket socket)
: ws_(std::move(socket)) {}
void run() {
ws_.async_accept(
beast::bind_front_handler(
&WebSocketSession::on_accept,
shared_from_this()
)
);
}
private:
void on_accept(beast::error_code ec) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
return;
}
do_read();
}
void do_read() {
ws_.async_read(
buffer_,
beast::bind_front_handler(
&WebSocketSession::on_read,
shared_from_this()
)
);
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
if (ec == websocket::error::closed) {
return;
}
std::cerr << "Read error: " << ec.message() << std::endl;
return;
}
try {
auto received_message = beast::buffers_to_string(buffer_.data());
json received_json = json::parse(received_message);
std::string response_message;
if (received_json.contains("echo")) {
json response_json = {
{"type", "response"},
{"original", received_json}
};
response_message = response_json.dump();
}
else {
buffer_.consume(buffer_.size());
do_read();
return;
}
response_ptr_ = std::make_shared<std::string>(std::move(response_message));
ws_.text(ws_.got_text());
ws_.async_write(
asio::buffer(*response_ptr_),
beast::bind_front_handler(
&WebSocketSession::on_write,
shared_from_this()));
}
catch (const std::exception& e) {
std::cerr << "Processing error: " << e.what() << std::endl;
}
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
std::cerr << "Write error: " << ec.message() << std::endl;
return;
}
buffer_.consume(buffer_.size());
response_ptr_.reset();
do_read();
}
websocket::stream<tcp::socket> ws_;
beast::flat_buffer buffer_;
std::shared_ptr<std::string> response_ptr_;
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint)
: acceptor_(ioc, endpoint) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
beast::bind_front_handler(
&WebSocketServer::on_accept,
this
)
);
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
}
else {
std::make_shared<WebSocketSession>(std::move(socket))->run();
}
do_accept();
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << "Web server is running:\n";
try {
asio::io_context ioc;
tcp::endpoint endpoint(tcp::v4(), 9002);
WebSocketServer server(ioc, endpoint);
ioc.run();
}
catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
在我看来,客户端不会崩溃,而只是正确发送一条消息https://i.imgur.com/0zrvFBP.jpeg
在 ASan/UBSan 下运行代码没有发现明显的问题(很好)。
要使客户端成为全双工,您将需要线程或异步。我建议异步,因为它适合库的选择。
从动态缓冲区中消费时要小心:不要只消费整个缓冲区,除非它是当前消息的全部部分。
您需要对传出写入进行排队,因为您不能有重叠的写入,并且显然您的接口是多线程的。
在服务器端,您通过确保在发送响应之前不读取下一个请求来使事情变得简单。我注意到
是多余的,因为它已经是response_ptr_
对象的一部分。所以,我就做到了shared_from_this()
。std::string response_;
服务器备注:
ws_.got_text()
很奇怪,因为你绝对保证 json 文本我建议结合:
文件
client.cpp
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
#include <nlohmann/json.hpp>
#include <queue>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
// Thread-safe queue class
template <typename T> class ThreadSafeQueue {
public:
void push(T const& value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
cond_var_.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
private:
std::queue<T> queue_;
mutable std::mutex mtx_;
std::condition_variable cond_var_;
};
using MsgQueue = ThreadSafeQueue<json>;
struct WSClient {
WSClient(std::string const& host, std::string const& port, MsgQueue& inbox) : inbox_(inbox) {
// connect, handshake
connect(ws.next_layer(), tcp::resolver (ioc).resolve(host, port));
ws.handshake(host, "/");
// start async read chain
do_read_loop();
};
void stop() {
beast::get_lowest_layer(ws).cancel();
ioc.join();
}
void send(json&& message) {
asio::post(ws.get_executor(), [this, m = std::move(message)]() mutable { //
outbox_.push_back(std::move(m).dump());
if (outbox_.size() == 1)
do_write_loop(); // start the pump
});
}
private:
asio::thread_pool ioc{1}; // single thread should suffice
beast::flat_buffer incoming_;
MsgQueue& inbox_;
std::deque<std::string> outbox_; // serialized form for buffer stability
websocket::stream<tcp::socket> ws{ioc};
void do_read_loop() {
ws.async_read(incoming_, [this](beast::error_code ec, size_t n) {
std::cout << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
if (ec)
return;
auto received_message = beast::buffers_to_string(incoming_.data()).substr(0, n);
json received_json = json::parse(received_message);
inbox_.push(received_json);
std::cout << "Received from server: " << received_message << std::endl;
incoming_.consume(n);
do_read_loop();
});
}
void do_write_loop() {
if (outbox_.empty())
return;
ws.async_write(asio::buffer(outbox_.front()), [this](beast::error_code ec, size_t n) {
std::cout << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
outbox_.pop_front();
do_write_loop();
}
});
}
};
int main() try {
MsgQueue received;
WSClient client("127.0.0.1", "9002", received);
std::cout << "To send: ";
for (std::string input; std::getline(std::cin, input); std::cout << "To send: ") {
if (input == "stop")
break;
if (input.empty())
continue;
client.send({{input.starts_with('h') ? "echo" : "null", std::move(input)}});
for (json msg; received.try_pop(msg);)
std::cout << " - Processing queued: " << msg << std::endl;
}
client.stop();
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
文件
server.cpp
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
#include <nlohmann/json.hpp>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
WebSocketSession(tcp::socket socket) : ws_(std::move(socket)) {}
void run() {
ws_.async_accept(beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
}
~WebSocketSession() { std::cerr << "Session " << peer_ << " closed" << std::endl; }
private:
void on_accept(beast::error_code ec) {
std::cerr << "Accept: " << ec.message() << " for " << peer_ << std::endl;
if (!ec)
do_read_loop();
}
void do_read_loop() {
ws_.async_read(buffer_, beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, size_t n) {
if (ec) {
if (ec == websocket::error::closed) {
return;
}
std::cerr << "Read error: " << ec.message() << std::endl;
return;
}
try {
response_.clear();
auto it = buffers_begin(buffer_.data());
json msg = json::parse(it, it + n);
buffer_.consume(n);
if (msg.contains("echo"))
response_ = json{{"type", "response"}, {"original", std::move(msg)}}.dump();
if (response_.empty()) {
do_read_loop();
} else {
ws_.text(true);
ws_.async_write(asio::buffer(response_),
beast::bind_front_handler(&WebSocketSession::on_write, shared_from_this()));
}
} catch (std::exception const& e) {
std::cerr << "Processing error: " << e.what() << std::endl;
}
}
void on_write(beast::error_code ec, size_t n) {
std::cerr << "Write: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec)
do_read_loop();
}
websocket::stream<tcp::socket> ws_;
beast::flat_buffer buffer_;
std::string response_;
tcp::endpoint peer_ = ws_.next_layer().remote_endpoint();
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint) : acceptor_(ioc, endpoint) { do_accept(); }
private:
void do_accept() { acceptor_.async_accept(beast::bind_front_handler(&WebSocketServer::on_accept, this)); }
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
} else {
std::make_shared<WebSocketSession>(std::move(socket))->run();
}
do_accept();
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << "Web server is running:\n";
try {
asio::io_context ioc;
tcp::endpoint endpoint(tcp::v4(), 9002);
WebSocketServer server(ioc, endpoint);
ioc.run();
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
}
使用习惯演示: