使用 Boost.Asio 异步 UDP 服务器进行 MSVC 矢量调试断言

问题描述 投票:0回答:1

我正在使用 Boost.Asio 开发跨平台 UDP 异步服务器,并在使用 MSVC (Visual Studio 2022) 编译时出现以下断言错误:

File: <visual studio path>/include/vector
Line: 54

Expression: can't dereference invalidated vector iterator

当使用 GCC 在 Linux 上构建相同的代码时,一切都很好。

这是我的服务器的一个最小示例:

#include <chrono>
#include <cstdint>
#include <format>
#include <iostream>
#include <thread>
#include <vector>
#include <sdkddkver.h>
#include <boost/asio.hpp>

namespace asio = boost::asio;
using asio::ip::udp;
using asio::ip::port_type;

class Server {
public:
    asio::io_context context{};
    const port_type port;

private:
    std::array<char, 4'096> receivedData{};
    udp::socket socket;
    std::mutex mutex{};

    std::atomic<bool> isRunning{ true };
    std::thread IO_Thread{
        [this] {
            try {
                while (this->isRunning.load()) {
                    this->context.run();
                }
            } catch (std::exception& error) {
                std::cerr << error.what() << std::endl;
                std::exit(1);
            }
        }
    };

    void startReceiving() {
        static udp::endpoint clientIP{};

        this->socket.async_receive_from(
            asio::buffer(this->receivedData, 4'096),
            clientIP,
            [this] (std::error_code error, std::size_t bytesReceived) {
                if (error) {
                    throw std::runtime_error{ std::format("Error while receiving data ({} byte(s)): {}", bytesReceived, error.message()) };
                }
                std::lock_guard lock{ this->mutex };
                const std::vector<std::uint8_t> buf{ this->receivedData.cbegin(), this->receivedData.cbegin() + bytesReceived };
                if (buf[0] == 0xFF) {
                    this->send(clientIP, { 0xFF });
                }
                this->receivedData.fill(0);
                this->startReceiving();
            }
        );
    }


public:
    Server(port_type port) :
        port{ port },
        socket{ this->context, udp::endpoint{ udp::v4(), port }  }
    {
        this->startReceiving();
    };

    ~Server() {
        this->isRunning.store(false);
        if (this->IO_Thread.joinable()) {
            this->IO_Thread.join();
        }
    }

    void send(const udp::endpoint& clientIP, const std::vector<std::uint8_t>& msg) {
        this->socket.async_send_to(
            asio::buffer(msg),
            clientIP,
            [this, msg] (std::error_code error, std::size_t bytesSent) {
                if (error) {
                    throw std::runtime_error{ std::format("Error while sending {} byte(s) of data ({} byte(s) effectively sent): {}", msg.size(), bytesSent, error.message()) };
                }
            }
        );
    }
};

int main(int argc, char* argv[]) {
    Server server{ 5'000 };
    std::this_thread::sleep_for(std::chrono::seconds(4));
}

在减少我的实际代码时,我成功地准确定位了问题:

// this raises an assertion error
const std::vector<std::uint8_t> buf{ this->receivedData.cbegin(), this->receivedData.cbegin() + bytesReceived };
if (buf[0] == 0xFF) {
    this->send(clientIP, { 0xFF });
}

// but this doesn't
if (this->receivedData[0] == 0xFF) {
    this->send(clientIP, { 0xFF });
}

我看到

std::vector
不是线程安全的,但我很难理解为什么这是一个问题,因为向量不在多个线程之间共享。另外,我真的不明白为什么我在 Linux 上没有遇到任何问题。

这是 CMakeLists.txt 摘录:

find_package(Boost COMPONENTS asio CONFIG REQUIRED)
if (WIN32)
    target_link_libraries(asio_udp PRIVATE wsock32)
    target_link_libraries(asio_udp PRIVATE Ws2_32)
endif ()
target_link_libraries(asio_udp PRIVATE Boost::asio) # Boost.Asio is statically linked
target_compile_features(asio_udp PRIVATE cxx_std_20)
...
elseif (${CMAKE_CXX_COMPILER_ID} STREQUAL MSVC)
    # Warning C4464 warns about . and .. in #include directive
    target_compile_options(asio_udp PRIVATE /W3 /external:anglebrackets /external:W0 /wd4464)
    set_property(TARGET asio_udp PROPERTY MSVC_RUNTIME_LIBRARY "MultiThreadedDebug")
else ()
...

如果重要的话,这也是我减少的客户:

#include <array>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <exception>
#include <format>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <sdkddkver.h>
#include <boost/asio.hpp>

namespace asio = boost::asio;
using asio::ip::udp;
using asio::ip::port_type;

class Client {
public:
    asio::io_context context{};

private:
    std::array<char, 4'096> receivedData{};
    std::mutex mutex{};

    std::thread IO_Thread{
        [this] {
            try {
                while (this->isRunning.load()) {
                    this->context.run();
                }
            } catch (std::exception& error) {
                std::cerr << error.what() << std::endl;
            }
        }
    };
    udp::resolver resolver{ this->context };
    udp::socket socket;
    udp::endpoint endpoint;
    udp::endpoint serverEndpoint;

    std::atomic<bool> isRunning{ true };

    void startReceiving() {
        this->socket.async_receive_from(
            asio::buffer(this->receivedData, 4'095),
            this->serverEndpoint,
            [this] (std::error_code error, std::size_t bytesReceived) {
                if (error) {
                    throw std::runtime_error{ std::format("Error while receiving {} byte(s) of data: {}", bytesReceived, error.message()) };
                }

                this->startReceiving();
            }
        );
    }

public:
    Client(const std::string& serverHostName, port_type serverPort) :
        socket{ this->context },
        endpoint{},
        serverEndpoint{ *this->resolver.resolve(udp::v4(), serverHostName, std::to_string(serverPort)).begin() }
    {
        this->socket.open(udp::v4());
        this->socket.send_to(asio::buffer({ 0xFF }), this->serverEndpoint);
        this->startReceiving();
    }

    ~Client() {
        this->send({ 0x00 }); // disconnect
        this->isRunning.store(false);
        if (this->IO_Thread.joinable()) {
            this->mutex.lock();
            this->context.stop(); // discarding potential pending async tasks
            this->mutex.unlock();
            this->IO_Thread.join();
        }
    }

    void send(const std::vector<std::uint8_t>& str) {
        this->socket.async_receive_from(
            asio::buffer(this->receivedData, 4'095),
            this->serverEndpoint,
            [this] (std::error_code error, std::size_t bytesReceived) {
                if (error) {
                    throw std::runtime_error{ std::format("Error while receiving {} byte(s) of data: {}", bytesReceived, error.message()) };
                }

                this->startReceiving();
            }
        );
    }
};

int main() {
    Client client{ "127.0.0.1", 5'000 };
    std::this_thread::sleep_for(std::chrono::seconds(4));
}
visual-c++ udp boost-asio stdvector visual-c++-2022
1个回答
0
投票
  1. 这里有UB:

        socket.async_send_to(
            asio::buffer(msg), clientIP
    

    msg
    是一个局部堆栈变量。
    clientIP
    指的是也用于下一个
    async_receive_from
    的相同静态变量。

  2. 互斥锁保护

    io_context::stop
    没有用(io_context是线程安全的)。

  3. 完成处理程序中围绕缓冲区处理的互斥锁也存在问题:只有一个 IO 线程,完成处理程序始终从该线程运行。

    
    

  4. async_receive_from

    上的

    send
    功能...不发送。
    
    

  5. Client

    上的接收操作会覆盖服务器端点。

    
    

  6. 以下调用不会执行您认为的操作:
  7. Client

    亲自查看:
    https://coliru.stacked-crooked.com/a/d73a3dad5a08a31f

    不要使用

    asio::buffer({0xFF})

    “修复”它 - 这仍然是 UB(缓冲区指向临时)。使用例如

    {'\xFF'}
    代替。
    
    

  8. 考虑一个链,而不是互斥体 - 你有一个逻辑链,因为只有 IO 线程驱动程序,main 只是在等待
  9. 为了获得奖励积分,根本不要聚合线程或上下文。相反,只需传递一个执行器,它让用户决定链/线程/谁运行服务。
  10. 添加最少量的日志记录,以便我们可以看到双向流量:

    文件
  • asio::buffer("\xFF", 1)

    server.cpp

  • 文件
  • #include <boost/asio.hpp> #include <format> #include <iostream> #include <fmt/ranges.h> #include <boost/lexical_cast.hpp> namespace asio = boost::asio; using asio::ip::udp; using asio::ip::port_type; class Server { std::array<char, 4'096> incoming_{}, outgoing_{}; udp::socket socket_; udp::endpoint peer_; void startReceiving() { socket_.async_receive_from(asio::buffer(incoming_), peer_, [this](std::error_code error, size_t n) { if (error) { std::cerr << std::format("Error receiving ({} byte(s) effectively received): {}", n, error.message()); } else { std::string_view buf(incoming_.data(), n); // not copying fmt::print("Received {::#02x} from {}\n", std::span(buf), boost::lexical_cast<std::string>(peer_)); if (buf.starts_with(0xFF)) // safer than buf[0] == 0xFF send(peer_, {0xFF}); startReceiving(); } }); } public: Server(asio::any_io_executor ex, port_type port) // : socket_{ex, udp::endpoint{udp::v4(), port}} { startReceiving(); }; void send(udp::endpoint peer, std::vector<uint8_t> const& msg) { assert(msg.size() < outgoing_.size()); size_t n = std::min(outgoing_.size(), msg.size()); std::copy_n(msg.begin(), n, outgoing_.begin()); socket_.async_send_to( asio::buffer(outgoing_, n), peer, [n](std::error_code error, size_t sent) { if (error) std::cerr << std::format( "Error while sending {} byte(s) of data ({} byte(s) effectively sent): {}", n, sent, error.message()); }); } }; int main() { asio::io_context ioc; Server server{ioc.get_executor(), 5'000}; ioc.run_for(std::chrono::seconds(4)); }

    client.cpp

    
    

© www.soinside.com 2019 - 2024. All rights reserved.