从 COM 端口读取数据 - 缓冲区溢出

问题描述 投票:0回答:1

我在处理来自 COM 端口的数据包时遇到问题。该端口的读取速度为115200。该端口的大量数据丢失。缓冲区几乎立即溢出。在我附加的代码中,我尝试即时删除过度处理的缓冲区。

void COM::ReadLoop(std::shared_ptr<std::vector<unsigned char>> pbuf) {
    if (!pbuf) {
        assert(m_readoperation.expired()); // do not post overlapping read operations
        pbuf = std::make_shared<std::vector<unsigned char>>();
        m_readoperation = pbuf;
    }

    asio::async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
        [this, pbuf](boost::system::error_code ec, size_t /*length*/) {
            if (!ec) {
                auto& buffer = *pbuf;
                size_t const length = buffer.size();
            
                if (length > 1024) {
                    g_logger.info("buffer overflow");
                    buffer.erase(buffer.begin(), buffer.begin() + length - 1024); 
                }
                m_Data.store(length);

                size_t i = 0;
                while (i < buffer.size() - 2) {
                    if (buffer[i] == 170 && buffer[i + 1] == 170) {
                        size_t len = buffer[i + 2];
                        if (len < 4 || len >= 170 || i + 3 + len > buffer.size()) {
                            i++; //We move the index to continue searching
                            continue;
                        }
                        std::vector<unsigned char> packetData(buffer.begin() + i + 3, buffer.begin() + i + 3 + len);
                        parse_packet(packetData);
                        buffer.erase(buffer.begin(), buffer.begin() + i + 3 + len);
                        i = 0;
                        previousSize = buffer.size();

                    }
                    else {
                        i++;
                    }
                    if (buffer.size() == previousSize) {
                             // If the buffer has not been modified, we continue processing
                             // This prevents infinite looping when there is no more data to process
                        break;
                    }
                }

                ReadLoop(pbuf); 
            }
            else {
                g_logger.info("ReadLoop error: " + ec.message());
            }
        });
}





void COM::parse_packet(const std::vector<unsigned char>& data) {

    if (data.size() < 4) // Minimal packet length check
        return;

    int generated_checksum = 0;
    for (size_t i = 0; i < data.size() - 1; ++i) { // Omit the checksum byte in calculations
        generated_checksum += data[i];
    }
    generated_checksum = 255 - (generated_checksum % 256);

    int checksum = data.back(); // Assume the last byte is the checksum

    if (checksum != generated_checksum) {
        return; // Checksum mismatch
    }


    int i = 0;
    while (i < data.size() - 1) {
        unsigned char data_type = data[i++];
        g_logger.info(std::to_string(data_type));

        switch (data_type) {
            g_logger.info(std::to_string(data_type));
        case 1: { 
            unsigned char battery_level = data[i++];
            m_BatteryLevel.store(battery_level);
            break;
        }
        // here case 2-5
   
        case 0x80: { // Raw data
            i += 2; // We ignore the byte length
            int raw_data = data[i++] << 8;
            raw_data += data[i++];
            if (raw_data >= 32768) {
                raw_data -= 65536;
            }
            m_Raw.store(raw_data);
        }
        case 0x83: { // Power
            std::vector<int> g_power;
            std::ostringstream oss;

            for (int j = 0; j < 8; ++j) {
                int power = data[i + 2] + (data[i + 1] << 8) + (data[i] << 16);
                if (power > 8388608) {
                    power -= 16777216;
                }
                g_power.push_back(power);
                i += 3;
            }
            for (auto power : g_power) {
                oss << power << " ";
            }
            m_PowerG = oss.str();

            break;
        }
                
        default: {
            break;
        }
        }
    }
}

顺便说一句,我有一个用Python编写的程序,可以轻松读取和处理包。

运行程序几秒钟

Initialize port: COM8
Port initialized successfully.
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
buffer overflow
buffer overflow
buffer overflow
buffer overflow
buffer overflow
128
buffer overflow
128
128
buffer overflow
128
128

问题 如何从缓冲区读取数据使得缓冲区不溢出?

小更新

我找到了原始代码。它是用Delphi 编写的。这里有一些逻辑

  if l>512 then l:=l1;
     if l>512 then begin
        ReceivedString := '';
        UnparsedRemainingString:='';
        CurrentPosition:=0;
        l:=0;
     end;
  UnparsedRemainingString:='';
  for i:=1 to l-2 do
     if ReceiveByte()=170 then if ReceiveByte()=170 then begin
        len:=ReceiveByte();
        if len<4 then continue;
        if len>=170 then continue;
        if CurrentPosition+len>Length(ReceivedString) then begin DropParsing; break; end else ParsePacket(len);
     end;
end;

function ReceiveByte() : byte;
begin
  Result:=0;
  if CurrentPosition>=Length(ReceivedString) then exit;
  Result:=byte(ReceivedString[CurrentPosition]);
  CurrentPosition:=CurrentPosition+1;
end;

Procedure SkipByte();
begin
 CurrentPosition:=CurrentPosition+1;
end;

Procedure DropParsing();
var i: integer;
begin
  UnparsedRemainingString:='';
  if CurrentPosition>3 then begin
     for i:=CurrentPosition-3 to Length(ReceivedString)
        do UnparsedRemainingString:=UnparsedRemainingString+ReceivedString[i];
  end else UnparsedRemainingString:='';
  if length(UnparsedRemainingString)>128 then UnparsedRemainingString:='';
  ReceivedString:='';
end;

这是我用 C++ 编写的实际代码

    asio::async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
        [this, pbuf](boost::system::error_code ec, size_t /*length*/) {
            if (!ec) {
                auto& buffer = *pbuf;
                size_t const length = buffer.size();
                // Przygotowanie do ponownego odczytu, jeśli długość bufora przekracza 512 bajtów
                m_Data.store(length);
                size_t i = 0;
                while (i < buffer.size() - 2) {
                    if (buffer[i] == 170 && buffer[i + 1] == 170) {
                        size_t len = buffer[i + 2];
                        if (len < 4 || len >= 170 || i + 3 + len > buffer.size()) {
                            i++; // Przesuwamy indeks, aby kontynuować przeszukiwanie
                            continue;
                        }
                        std::vector<unsigned char> packetData(buffer.begin() + i + 3, buffer.begin() + i + 3 + len);
                        parse_packet(packetData);
                        i += 3 + len;


                    }
                    else {
                        i++;
                    }

                }
                

                if (length > 32768) {
                    g_logger.info("buffer overflow");
                    buffer.erase(buffer.begin(), buffer.begin() + length - 32768); // Zachowujemy ostatnie 512 bajtów
                }
                

                ReadLoop(pbuf); // Kontynuacja pętli odczytu
            }
            else {
                g_logger.info("ReadLoop error: " + ec.message());
            }
        });
}



128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128
2
buffer overflow
128
2
128
2
128
2
128

我仍然不知道如何删除缓冲区。而且我不确定是否应该增加

  1. 并且 += len - 1;
  2. 且 += 3 + len;
c++ serial-port boost-asio com-port
1个回答
0
投票

我将按照其可能的意图来解释这个问题:“如何读取可能以不可预测的传输数据包到达的连续逻辑消息流”。

我可能会将所有逻辑提取到方便的函数中,这样您就可以用代码表达您的意图:

   m_Data.store(pbuf->size());

   // Directly process binary data from buffer here.
   View remain(*pbuf);
   while (auto msg = scan_for_message(remain))
       process(*msg);

   pbuf->erase(pbuf->begin(), pbuf->end() - remain.length());
   // Continue reading into the same buffer for new data.
   ReadLoop(pbuf);

帮助者可以是:

using View = std::basic_string_view<uint8_t>;

constexpr uint8_t calc_checksum(View data) {
    unsigned cksum = 0;
    for (uint8_t b : data)
        cksum += b;
    return 255 - (cksum % 256);
}

这为我们提供了一种验证消息有效负载的简单方法:

constexpr bool validate_message_payload(View payload) // signature and length not included
{
    // size
    if (payload.size() < 4)
        return false; // Minimal packet length

    // checksum
    {
        uint8_t const expected = payload.back();
        payload.remove_suffix(1); // Exclude checksum byte
        uint8_t actual = calc_checksum(payload);

        g_logger.info("Checksum expected: " + std::to_string(expected) +
                      ", actual: " + std::to_string(actual));

        if (expected != actual) {
            g_logger.info("Checksum failed");
            return false;
        }
    }

    return true;
}

现在我们可以实现

scan_for_message
:

// Finds the first *valid* message, discarding preceding data.
// Returns the payload data span or nullopt if none found.
constexpr std::optional<View> scan_for_message(View& buffer) {
    while (!buffer.empty()) {
        // Find signature
        static constexpr std::array<uint8_t, 2> signature{0xaa, 0xaa};
        {
            View constexpr sigspan(signature.data(), signature.size());
            auto pos = buffer.find(sigspan);

            if (pos == View::npos) {
                buffer.remove_prefix(buffer.length()); // none of the buffer is relevant
                return {};
            }

            buffer.remove_prefix(pos); // skip noise/partial message debris
        }
        auto candidate = buffer.substr(signature.size());

        // Get length
        if (candidate.empty())
            return {}; // incomplete

        unsigned payload_len = candidate.front();

        if (candidate.length() < payload_len + 1)
            return {}; // incomplete

        View payload = candidate.substr(1, payload_len); // skips length byte

        if (validate_message_payload(payload)) {
            // remove from buffer, and return payload
            buffer.remove_prefix(signature.size() + 1 + payload_len);
            auto data = payload.substr(0, payload.length() - 1); // throw away checksum
            return data;
        } else {

            // Not valid.

            // Note: we don't skip the entire payload, just the current header signature, because it might be
            // valid part of a previous partial message, and a next, valid message might appear in the middle
            // of the payload we just tried to validate
            buffer.remove_prefix(signature.size());
        }
    }

    return {};
}

这样做的重点是负责任地消耗缓冲区内容,同时保留潜在的不完整消息。

现场演示

#include <boost/asio.hpp>
#include <fmt/ranges.h>
#include <fmt/printf.h>
#include <iostream>
using namespace std::chrono_literals;
using namespace std::string_literals;
namespace asio = boost::asio;
using std::this_thread::sleep_for;

// mocking question code
struct {
    void info(std::string const& msg) const {
        static auto const start = std::chrono::steady_clock::now();

        auto ts = (std::chrono::steady_clock::now() - start);
        if (ts > 1s)
            std::cout << std::setw(6) << ts / 1.s << "s  " << msg << std::endl;
        else
            std::cout << std::setw(6) << ts / 1ms << "ms " << msg << std::endl;
    }
} static g_logger;
// end mocks

using View = std::basic_string_view<uint8_t>;

constexpr uint8_t calc_checksum(View data) {
    unsigned cksum = 0;
    for (uint8_t b : data)
        cksum += b;
    return 255 - (cksum % 256);
}

constexpr bool validate_message_payload(View payload) // signature and length not included
{
    // size
    if (payload.size() < 4)
        return false; // Minimal packet length

    // checksum
    {
        uint8_t const expected = payload.back();
        payload.remove_suffix(1); // Exclude checksum byte
        uint8_t actual = calc_checksum(payload);

        g_logger.info("Checksum expected: " + std::to_string(expected) +
                      ", actual: " + std::to_string(actual));

        if (expected != actual) {
            g_logger.info("Checksum failed");
            return false;
        }
    }

    return true;
}

// Finds the first *valid* message, discarding preceding data.
// Returns the payload data span or nullopt if none found.
constexpr std::optional<View> scan_for_message(View& buffer) {
    while (!buffer.empty()) {
        // Find signature
        static constexpr std::array<uint8_t, 2> signature{0xaa, 0xaa};
        {
            View constexpr sigspan(signature.data(), signature.size());
            auto pos = buffer.find(sigspan);

            if (pos == View::npos) {
                buffer.remove_prefix(buffer.length()); // none of the buffer is relevant
                return {};
            }

            buffer.remove_prefix(pos); // skip noise/partial message debris
        }
        auto candidate = buffer.substr(signature.size());

        // Get length
        if (candidate.empty())
            return {}; // incomplete

        unsigned payload_len = candidate.front();

        if (candidate.length() < payload_len + 1)
            return {}; // incomplete

        View payload = candidate.substr(1, payload_len); // skips length byte

        if (validate_message_payload(payload)) {
            // remove from buffer, and return payload
            buffer.remove_prefix(signature.size() + 1 + payload_len);
            auto data = payload.substr(0, payload.length() - 1); // throw away checksum
            return data;
        } else {

            // Not valid.

            // Note: we don't skip the entire payload, just the current header signature, because it might be
            // valid part of a previous partial message, and a next, valid message might appear in the middle
            // of the payload we just tried to validate
            buffer.remove_prefix(signature.size());
        }
    }

    return {};
}

struct COM {
    asio::thread_pool m_ioc{1};
    asio::serial_port m_port{m_ioc};
    std::atomic_int   m_Data = 0;

    std::weak_ptr<void> m_readoperation; // only when read operation active

    ~COM() { m_ioc.join(); }

    void ReadLoop(std::shared_ptr<std::vector<unsigned char>> pbuf = {}) {
        // g_logger.info("Readloop" + (pbuf ? " with buffer"s : ""));
        if (!pbuf) {
            assert(m_readoperation.expired()); // do not post overlapping read operations
            pbuf          = std::make_shared<std::vector<unsigned char>>();
            m_readoperation = pbuf;
        }

        async_read(m_port, asio::dynamic_buffer(*pbuf), asio::transfer_at_least(1),
                   [this, pbuf](boost::system::error_code ec, size_t /*length*/) {
                       if (!ec) {
                           try {
                               m_Data.store(pbuf->size());

                               // Directly process binary data from buffer here.
                               View remain(*pbuf);
                               while (auto msg = scan_for_message(remain))
                                   process(*msg);

                               pbuf->erase(pbuf->begin(), pbuf->end() - remain.length());
                               // Continue reading into the same buffer for new data.
                               ReadLoop(pbuf);
                           } catch (const std::exception& e) {
                               g_logger.info("Error processing received data: " + std::string(e.what()));
                           }
                       } else {
                           // Handle read errors.
                           g_logger.info("ReadLoop error: " + ec.message());
                       }
                   });
    }

    void process(View data) { // already validated
        fmt::print("Process valid message: {::#02x}\n", std::vector(data.begin(), data.end()));
        for (uint8_t ch : data) {
            switch (ch) {
            case 1: { // Battery level
                      //...
            }
            }
        }
    }

    void initialize(bool enable = true) {
        sleep_for(100ms);

        m_port.open("COM5");

        // Configure serial port settings
        using P = asio::serial_port;
        m_port.set_option(P::baud_rate(115200));
        m_port.set_option(P::character_size(8));
        m_port.set_option(P::parity(P::parity::none));
        m_port.set_option(P::stop_bits(P::stop_bits::one));
        m_port.set_option(P::flow_control(P::flow_control::none));

        g_logger.info("Connected");

        setEnabled(enable);
    }

    void setEnabled(bool enable) {
        if (enable == !m_readoperation.expired())
            return; // nothing to do

        if (!enable)
            m_port.cancel(); // cancels read operation (asio::error::operation_aborted)
        else
            ReadLoop();
    }

    void terminate() {
        setEnabled(false);
        g_logger.info("Bye!");
    }
};

// demo code
int main() {
    std::cout << std::fixed << std::setprecision(2);
    g_logger.info("Demo start");
    COM g_com;

    g_com.initialize();
    // g_com.terminate();
}

通过一些繁重的测试,向其抛出大量随机二进制数据,这会产生“几乎有效的消息”,但当然很少会通过校验和测试,但接受手动测试消息:

有趣的是,最后发现一条“有效”消息是随机生成的:)

Process valid message: [0xf5, 0x82, 0x7, 0x94, 0x24, 0x6f, 0x4b, 0x5, 0xf5, 0x56, 0xf3, 0x72, 0x39, 0x3b, 0x97]                                                                                                                                
© www.soinside.com 2019 - 2024. All rights reserved.