uWebSockets send() 上的 C++ 错误:在未检查 cancork 的情况下不得获取 cork 缓冲区

问题描述 投票:0回答:1

我正在为嵌入式系统开发一个 websoscket 服务器。要求非常简单:将更新发送到 web 应用程序订阅的信号 ID 列表。一切正常,直到崩溃发生。 这个问题似乎是软木塞机制的一部分,我尝试在不同的条件下重现它,但问题仍然出现。 在我的实现中,我有一个自定义的发布/订阅,因为我想在一条消息中一次传递更多信号更新:这是在一个单独的线程中实现的,该线程以每秒 10 次的频率向连接的客户端发送异步消息(使用可用的更新)。 当发送和消息回调发生冲突时,似乎就会出现问题。

这是我的应用程序设置,与默认设置非常相似:

    auto app
        = uWS::App()
              .ws<ClientData>(
                  "/*", { /* Settings */
                          .compression      = uWS::CompressOptions(uWS::DEDICATED_COMPRESSOR_4KB | uWS::DEDICATED_DECOMPRESSOR),
                          .maxPayloadLength = 100 * 1024 * 1024,
                          .idleTimeout      = 16,
                          .maxBackpressure  = 100 * 1024 * 1024,
                          .closeOnBackpressureLimit = false,
                          .resetIdleTimeoutOnSend   = false,
                          .sendPingsAutomatically   = true,
                          /* Handlers */
                          // handshaking callback
                          .upgrade =
                              [this](auto *res, auto *req, auto *context)
                          {
                              auto subprotocols = String(req->getHeader("sec-websocket-protocol"));
                              auto jwt          = this->GetJwtFromSubprotocols(subprotocols);

                              if(!this->ValidateJwt(jwt))
                              {
                                  res->writeStatus("401");
                                  res->end("Invalid jwt");
                                  return;
                              }

                              if(this->IsMaximumCapacityReached())
                              {
                                  res->writeStatus("503");
                                  res->end("Maximum number of clients reached");
                                  return;
                              }

                              res->upgrade(ClientData{}, req->getHeader("sec-websocket-key"),
                                           req->getHeader("sec-websocket-protocol"), req->getHeader("sec-websocket-extensions"),
                                           context);
                          },
                          .open    = [this](auto *ws) { this->OnOpen(ws); },
                          .message = [this](auto *ws, std::string_view message, uWS::OpCode) { this->OnMessage(ws, message); },
                          .dropped =
                              [](auto * /*ws*/, std::string_view /*message*/, uWS::OpCode /*opCode*/)
                          {
                              /* A message was dropped due to set maxBackpressure and closeOnBackpressureLimit limit */
                          },
                          .drain =
                              [](auto * /*ws*/)
                          {
                              /* Check ws->getBufferedAmount() here */
                          },
                          .ping =
                              [](auto * /*ws*/, std::string_view)
                          {
                              /* Not implemented yet */
                          },
                          .pong =
                              [](auto * /*ws*/, std::string_view)
                          {
                              /* Not implemented yet */
                          },
                          .close = [this](auto *ws, int /*code*/, std::string_view /*message*/) { this->OnClose(ws); } })
              .listen("127.0.0.1", 50000,
                      [](auto *listen_socket)
                      {
                          if(listen_socket)
                          {
                              LogDebug("Ready on port 50000");
                          }
                      });

Sender 线程循环执行受互斥锁保护的这段代码:

Sync lock(*_clientsMutex);
for(auto client : *_clients)
{
    SendRealtimeUpdate(client);
}

经过一番阐述后,

SendRealtimeUpdate
方法提供了一条消息作为std::string并发送它
client->send(msg, uWS::OpCode::TEXT);

我想我错过了一些东西,也许在消息回调和我的线程发送之间需要实现适当的同步,这可能随时发生。

我尝试使用 uWebSockets pub/sub 内部机制,使用信号 id 作为主题,它会在准备好时自动管理发送,并且这有效。然而,这是低效的,因为我想在消息中发送多个信号更新而不是一个。

c++ websocket c++17 uwebsockets
1个回答
0
投票

Swift - Friday Pie 的评论是正确的。 除了极少数功能(例如计时器和“延迟”)之外,uWebSockets 不是线程安全

您的评论询问:

需要每 1 秒向所有连接的客户端发送一条消息

下面是使用定时器发送消息的线程安全方式。

#include "App.h"
   
using namespace std;

struct PerSocketData {};

uWS::WebSocket<false, true, PerSocketData> *gws=nullptr;
    
int main() {
   auto loop = uWS::Loop::get();

   struct us_timer_t *delayTimer = us_create_timer((struct us_loop_t *) loop, 0, 0);

   us_timer_set(delayTimer, [](struct us_timer_t *) {
                               if (gws) {
                                  cout << "calling send" << endl;
                                  gws->send("from server", uWS::OpCode::TEXT);
                               }
                            }, 1000, 1000);
   
   uWS::App app;

   app.ws<PerSocketData>("/*", {
         .idleTimeout = 0,
         .sendPingsAutomatically = false,
         .open = [](auto *ws) {
                    gws = ws;
                 },
         .close = [](auto */*ws*/, int /*code*/, std::string_view /*message*/) {
                     gws = nullptr;
                  }
      }).listen(9001, [](auto *) {
                      });

   app.run();
}
© www.soinside.com 2019 - 2024. All rights reserved.