通过 boost.interprocess 发送序列化结构的正确方法是什么?

问题描述 投票:0回答:1

我正在尝试使用 boost interprocess 通过共享内存发送数据结构。我想序列化数据并将其作为二进制发送。

我的方法基于这个答案:

通过boost消息队列发送复杂的数据结构

但是,一旦我切换到结构类型,我的代码就不会发送数据。一切都编译并运行良好。但是,当我关闭并重新打开接收器应用程序时,数据不再发送/接收。我怎样才能做到这一点,让接收器可以关闭和打开,并且会自动重新建立连接?

#datastruct.h

#ifndef DATASTRUCT_HPP
#define DATASTRUCT_HPP

#include <string>
#include <vector>

#include <boost/serialization/vector.hpp>

#define MAX_SIZE 150000 

namespace dataStruct {

    

    struct VUserPoint
    {       
        float PositionX;
        float PositionY;
        float PositionZ;


        template <typename Archive>
        void serialize(Archive& ar, const unsigned int version)
        {
            
            ar & PositionX;
            ar & PositionY;
            ar & PositionZ;
        }
    };



    struct FvpData
    {

        //Array of current points in frame
        std::vector<VUserPoint> UserPoints;
        int frameNumber;

        template <typename Archive>
        void serialize(Archive& ar, const unsigned int version)
        {           
            ar & UserPoints;
            ar& frameNumber;
        
        }

    };



} // namespace 

#endif // DATASTRUCT_HPP

#sender.cpp

#include "DataSender.h"
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>

 
using namespace boost::interprocess;

int num = 1;

void DataSender::Grab(std::vector<Eigen::Vector3d> points)
{
    dataStruct::FvpData data;
    //add userpoints
    data.UserPoints.clear();
    for (int i = 0; i < points.size(); i++)
    {
        dataStruct::VUserPoint pnt;

        pnt.PositionX = points[i].x();
        pnt.PositionY = points[i].y();
        pnt.PositionZ = points[i].z();

        
        data.UserPoints.push_back(pnt);
    }
    num += 1;
    data.frameNumber = num;

    try
    {
        message_queue mq
        (
            open_or_create,
            "mq",
            100,
            MAX_SIZE
        );

        
        std::stringstream oss;

        boost::archive::binary_oarchive oa(oss);
        oa << data;

        std::string serialized_string(oss.str());
        mq.send(serialized_string.data(), serialized_string.size(), 1);

        std::cout << data.frameNumber << std::endl;
    }
    catch (interprocess_exception& ex)
    {
        std::cerr << ex.what() << std::endl;
    }
}

#reciever.cpp

#include <iostream>
#include <string>
#include <iostream>

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>

#include "DataStruct.hpp"

using namespace boost::interprocess;


int main()
    {

    
        try
        {
            message_queue mq
    (
        open_or_create,
        "mq",
        100,
        MAX_SIZE
    );

            message_queue::size_type recvd_size;
            unsigned int priority;

            while (true)
            {
                dataStruct::FvpData me;

                std::stringstream iss;
                std::string serialized_string;
                serialized_string.resize(MAX_SIZE);
                mq.receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
                iss << serialized_string;

                boost::archive::binary_iarchive ia(iss);
                ia >> me;

                std::cout << me.frameNumber << std::endl;
                //   std::cout << me.name << std::endl;
            }
        }
        catch (interprocess_exception& ex)
        {
            std::cerr << ex.what() << std::endl;
        }

      //  message_queue::remove("mq");
    
 }
c++ boost interprocess
1个回答
0
投票

首先我简化并扩展了重现器:

  • 文件

    datastruct.hpp

     #pragma once
     #include <boost/serialization/vector.hpp>
     static constexpr inline size_t MAX_SIZE = 150'000;
    
     namespace dataStruct {
         struct VUserPoint {
             float PositionX, PositionY, PositionZ;
    
             template <typename Archive> void serialize(Archive& ar, unsigned) {
                 ar & PositionX & PositionY & PositionZ;
             }
         };
    
         struct FvpData {
             // Array of current points in frame
             std::vector<VUserPoint> UserPoints;
             int                     frameNumber;
    
             template <typename Archive> void serialize(Archive& ar, unsigned) { ar & UserPoints & frameNumber; }
         };
    
     } // namespace dataStruct
    
  • 文件

    sender.cpp

     #include "datastruct.hpp"
     #include <Eigen/Core>
     #include <boost/archive/binary_iarchive.hpp>
     #include <boost/archive/binary_oarchive.hpp>
     #include <boost/archive/text_oarchive.hpp>
     #include <boost/interprocess/ipc/message_queue.hpp>
     #include <iostream>
    
     namespace bip = boost::interprocess;
    
     static int num = 1;
    
     void Grab(std::vector<Eigen::Vector3d> points) {
         dataStruct::FvpData data{{}, ++num};
         for (auto const& p : points)
             data.UserPoints.push_back(dataStruct::VUserPoint{
                 .PositionX = static_cast<float>(p.x()),
                 .PositionY = static_cast<float>(p.y()),
                 .PositionZ = static_cast<float>(p.z()),
             });
    
         try {
             bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
    
             std::stringstream oss;
    
             {
                 boost::archive::binary_oarchive oa(oss);
                 oa << data;
             } // completes the archive!
    
             std::string serialized_string(oss.str());
             mq.send(serialized_string.data(), serialized_string.size(), 1);
    
             std::cout << data.frameNumber << std::endl;
         } catch (bip::interprocess_exception const& ex) {
             std::cerr << ex.what() << std::endl;
         }
     }
    
     #include <random>
     #include <thread>
     int main() {
         std::cout << std::fixed << std::setprecision(3);
    
         std::mt19937 gen(std::random_device{}());
         std::uniform_real_distribution<double> dist(-10, 10);
    
         for (int n = 100; n--; std::this_thread::sleep_for(std::chrono::seconds(1))) {
             std::vector<Eigen::Vector3d> points;
             for (int i = 0; i < 3; ++i) {
                 auto x = dist(gen), y = dist(gen), z = dist(gen);
                 points.emplace_back(x, y, z);
                 std::cout << x << " " << y << " " << z << std::endl;
             }
    
             Grab(points);
         }
     }
    
  • 文件

    receiver.cpp

     #include "datastruct.hpp"
     #include <boost/archive/binary_iarchive.hpp>
     #include <boost/archive/binary_oarchive.hpp>
     #include <boost/archive/text_iarchive.hpp>
     #include <boost/interprocess/ipc/message_queue.hpp>
     #include <iomanip>
     #include <iostream>
     #include <sstream>
     #include <string>
    
     namespace bip = boost::interprocess;
    
     int main() {
         std::cout << std::fixed << std::setprecision(3);
    
         try {
             bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
    
             bip::message_queue::size_type recvd_size;
             unsigned int                  priority;
    
             while (true) {
                 dataStruct::FvpData me;
    
                 std::stringstream iss;
                 std::string       serialized_string;
                 serialized_string.resize(MAX_SIZE);
                 mq.receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
                 iss << serialized_string;
    
                 boost::archive::binary_iarchive ia(iss);
                 ia >> me;
    
                 std::cout << me.frameNumber << std::endl;
                 for (auto const& p : me.UserPoints)
                     std::cout << p.PositionX << " " << p.PositionY << " " << p.PositionZ << "\n";
    
             }
         } catch (bip::interprocess_exception const& ex) {
             std::cerr << ex.what() << std::endl;
         }
    
         //  message_queue::remove("mq");
     }
    

接下来,我确认可以正常运行。

问题:安全停止

您没有说如何停止程序。由于显示的代码中没有任何内容,我可能会假设您使用 Ctrl-C (或更糟):

如您所见,您遇到了锁定异常。你需要干净地退出。您可以实现自己的优雅关闭,或者使用信号来拦截 Ctrl-C/终止:

std::atomic_bool shutdown{false};

boost::asio::thread_pool io{1};
boost::asio::signal_set  sigs(io, SIGINT, SIGTERM);
sigs.async_wait([&](boost::system::error_code const& ec, int num) {
    std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
    shutdown = true;
});

例如调整后的sender.cpp

#include "datastruct.hpp"
#include <Eigen/Core>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>

namespace bip = boost::interprocess;

static int num = 1;

void Grab(std::vector<Eigen::Vector3d> points) {
    dataStruct::FvpData data{{}, ++num};
    for (auto const& p : points)
        data.UserPoints.push_back(dataStruct::VUserPoint{
            .PositionX = static_cast<float>(p.x()),
            .PositionY = static_cast<float>(p.y()),
            .PositionZ = static_cast<float>(p.z()),
        });

    try {
        bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);

        std::stringstream oss;

        {
            boost::archive::binary_oarchive oa(oss);
            oa << data;
        } // completes the archive!

        std::string serialized_string(oss.str());
        mq.send(serialized_string.data(), serialized_string.size(), 1);

        std::cout << data.frameNumber << std::endl;
    } catch (bip::interprocess_exception const& ex) {
        std::cerr << ex.what() << std::endl;
    }
}

#include <random>
#include <thread>
int main() {
    std::cout << std::fixed << std::setprecision(3);

    std::mt19937                           gen(std::random_device{}());
    std::uniform_real_distribution<double> dist(-10, 10);

    std::atomic_bool shutdown{false};

    boost::asio::thread_pool io{1};
    boost::asio::signal_set  sigs(io, SIGINT, SIGTERM);
    sigs.async_wait([&](boost::system::error_code const& ec, int num) {
        std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
        shutdown = true;
    });

    try {
        for (; !shutdown; std::this_thread::sleep_for(std::chrono::seconds(1))) {
            std::vector<Eigen::Vector3d> points;
            for (int i = 0; i < 3; ++i) {
                auto x = dist(gen), y = dist(gen), z = dist(gen);
                points.emplace_back(x, y, z);
                std::cout << x << " " << y << " " << z << std::endl;
            }

            Grab(points);
        }
    } catch (std::exception const& ex) {
        std::cerr << ex.what() << std::endl;
    }

    io.join();
}

观看直播:

© www.soinside.com 2019 - 2024. All rights reserved.