我正在为嵌入式系统开发一个 websoscket 服务器。要求非常简单:将更新发送到 web 应用程序订阅的信号 ID 列表。一切正常,直到崩溃发生。 这个问题似乎是软木塞机制的一部分,我尝试在不同的条件下重现它,但问题仍然出现。 在我的实现中,我有一个自定义的发布/订阅,因为我想在一条消息中一次传递更多信号更新:这是在一个单独的线程中实现的,该线程以每秒 10 次的频率向连接的客户端发送异步消息(使用可用的更新)。 当发送和消息回调发生冲突时,似乎就会出现问题。
这是我的应用程序设置,与默认设置非常相似:
auto app
= uWS::App()
.ws<ClientData>(
"/*", { /* Settings */
.compression = uWS::CompressOptions(uWS::DEDICATED_COMPRESSOR_4KB | uWS::DEDICATED_DECOMPRESSOR),
.maxPayloadLength = 100 * 1024 * 1024,
.idleTimeout = 16,
.maxBackpressure = 100 * 1024 * 1024,
.closeOnBackpressureLimit = false,
.resetIdleTimeoutOnSend = false,
.sendPingsAutomatically = true,
/* Handlers */
// handshaking callback
.upgrade =
[this](auto *res, auto *req, auto *context)
{
auto subprotocols = String(req->getHeader("sec-websocket-protocol"));
auto jwt = this->GetJwtFromSubprotocols(subprotocols);
if(!this->ValidateJwt(jwt))
{
res->writeStatus("401");
res->end("Invalid jwt");
return;
}
if(this->IsMaximumCapacityReached())
{
res->writeStatus("503");
res->end("Maximum number of clients reached");
return;
}
res->upgrade(ClientData{}, req->getHeader("sec-websocket-key"),
req->getHeader("sec-websocket-protocol"), req->getHeader("sec-websocket-extensions"),
context);
},
.open = [this](auto *ws) { this->OnOpen(ws); },
.message = [this](auto *ws, std::string_view message, uWS::OpCode) { this->OnMessage(ws, message); },
.dropped =
[](auto * /*ws*/, std::string_view /*message*/, uWS::OpCode /*opCode*/)
{
/* A message was dropped due to set maxBackpressure and closeOnBackpressureLimit limit */
},
.drain =
[](auto * /*ws*/)
{
/* Check ws->getBufferedAmount() here */
},
.ping =
[](auto * /*ws*/, std::string_view)
{
/* Not implemented yet */
},
.pong =
[](auto * /*ws*/, std::string_view)
{
/* Not implemented yet */
},
.close = [this](auto *ws, int /*code*/, std::string_view /*message*/) { this->OnClose(ws); } })
.listen("127.0.0.1", 50000,
[](auto *listen_socket)
{
if(listen_socket)
{
LogDebug("Ready on port 50000");
}
});
Sender 线程循环执行受互斥锁保护的这段代码:
Sync lock(*_clientsMutex);
for(auto client : *_clients)
{
SendRealtimeUpdate(client);
}
经过一番阐述后,
SendRealtimeUpdate
方法提供了一条消息作为std::string并发送它client->send(msg, uWS::OpCode::TEXT);
我想我错过了一些东西,也许在消息回调和我的线程发送之间需要实现适当的同步,这可能随时发生。
我尝试使用 uWebSockets pub/sub 内部机制,使用信号 id 作为主题,它会在准备好时自动管理发送,并且这有效。然而,这是低效的,因为我想在消息中发送多个信号更新而不是一个。
Swift - Friday Pie 的评论是正确的。 除了极少数功能(例如计时器和“延迟”)之外,uWebSockets 不是线程安全。
您的评论询问:
需要每 1 秒向所有连接的客户端发送一条消息
下面是使用定时器发送消息的线程安全方式。
#include "App.h"
using namespace std;
struct PerSocketData {};
uWS::WebSocket<false, true, PerSocketData> *gws=nullptr;
int main() {
auto loop = uWS::Loop::get();
struct us_timer_t *delayTimer = us_create_timer((struct us_loop_t *) loop, 0, 0);
us_timer_set(delayTimer, [](struct us_timer_t *) {
if (gws) {
cout << "calling send" << endl;
gws->send("from server", uWS::OpCode::TEXT);
}
}, 1000, 1000);
uWS::App app;
app.ws<PerSocketData>("/*", {
.idleTimeout = 0,
.sendPingsAutomatically = false,
.open = [](auto *ws) {
gws = ws;
},
.close = [](auto */*ws*/, int /*code*/, std::string_view /*message*/) {
gws = nullptr;
}
}).listen(9001, [](auto *) {
});
app.run();
}