我正在尝试使用 boost interprocess 通过共享内存发送数据结构。我想序列化数据并将其作为二进制发送。
我的方法基于这个答案:
但是,一旦我切换到结构类型,我的代码就不会发送数据。一切都编译并运行良好。但是,当我关闭并重新打开接收器应用程序时,数据不再发送/接收。我怎样才能做到这一点,让接收器可以关闭和打开,并且会自动重新建立连接?
#datastruct.h
#ifndef DATASTRUCT_HPP
#define DATASTRUCT_HPP
#include <string>
#include <vector>
#include <boost/serialization/vector.hpp>
#define MAX_SIZE 150000
namespace dataStruct {
struct VUserPoint
{
float PositionX;
float PositionY;
float PositionZ;
template <typename Archive>
void serialize(Archive& ar, const unsigned int version)
{
ar & PositionX;
ar & PositionY;
ar & PositionZ;
}
};
struct FvpData
{
//Array of current points in frame
std::vector<VUserPoint> UserPoints;
int frameNumber;
template <typename Archive>
void serialize(Archive& ar, const unsigned int version)
{
ar & UserPoints;
ar& frameNumber;
}
};
} // namespace
#endif // DATASTRUCT_HPP
#sender.cpp
#include "DataSender.h"
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>
using namespace boost::interprocess;
int num = 1;
void DataSender::Grab(std::vector<Eigen::Vector3d> points)
{
dataStruct::FvpData data;
//add userpoints
data.UserPoints.clear();
for (int i = 0; i < points.size(); i++)
{
dataStruct::VUserPoint pnt;
pnt.PositionX = points[i].x();
pnt.PositionY = points[i].y();
pnt.PositionZ = points[i].z();
data.UserPoints.push_back(pnt);
}
num += 1;
data.frameNumber = num;
try
{
message_queue mq
(
open_or_create,
"mq",
100,
MAX_SIZE
);
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << data;
std::string serialized_string(oss.str());
mq.send(serialized_string.data(), serialized_string.size(), 1);
std::cout << data.frameNumber << std::endl;
}
catch (interprocess_exception& ex)
{
std::cerr << ex.what() << std::endl;
}
}
#reciever.cpp
#include <iostream>
#include <string>
#include <iostream>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include "DataStruct.hpp"
using namespace boost::interprocess;
int main()
{
try
{
message_queue mq
(
open_or_create,
"mq",
100,
MAX_SIZE
);
message_queue::size_type recvd_size;
unsigned int priority;
while (true)
{
dataStruct::FvpData me;
std::stringstream iss;
std::string serialized_string;
serialized_string.resize(MAX_SIZE);
mq.receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
iss << serialized_string;
boost::archive::binary_iarchive ia(iss);
ia >> me;
std::cout << me.frameNumber << std::endl;
// std::cout << me.name << std::endl;
}
}
catch (interprocess_exception& ex)
{
std::cerr << ex.what() << std::endl;
}
// message_queue::remove("mq");
}
首先我简化并扩展了重现器:
文件
datastruct.hpp
#pragma once
#include <boost/serialization/vector.hpp>
static constexpr inline size_t MAX_SIZE = 150'000;
namespace dataStruct {
struct VUserPoint {
float PositionX, PositionY, PositionZ;
template <typename Archive> void serialize(Archive& ar, unsigned) {
ar & PositionX & PositionY & PositionZ;
}
};
struct FvpData {
// Array of current points in frame
std::vector<VUserPoint> UserPoints;
int frameNumber;
template <typename Archive> void serialize(Archive& ar, unsigned) { ar & UserPoints & frameNumber; }
};
} // namespace dataStruct
文件
sender.cpp
#include "datastruct.hpp"
#include <Eigen/Core>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
namespace bip = boost::interprocess;
static int num = 1;
void Grab(std::vector<Eigen::Vector3d> points) {
dataStruct::FvpData data{{}, ++num};
for (auto const& p : points)
data.UserPoints.push_back(dataStruct::VUserPoint{
.PositionX = static_cast<float>(p.x()),
.PositionY = static_cast<float>(p.y()),
.PositionZ = static_cast<float>(p.z()),
});
try {
bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
std::stringstream oss;
{
boost::archive::binary_oarchive oa(oss);
oa << data;
} // completes the archive!
std::string serialized_string(oss.str());
mq.send(serialized_string.data(), serialized_string.size(), 1);
std::cout << data.frameNumber << std::endl;
} catch (bip::interprocess_exception const& ex) {
std::cerr << ex.what() << std::endl;
}
}
#include <random>
#include <thread>
int main() {
std::cout << std::fixed << std::setprecision(3);
std::mt19937 gen(std::random_device{}());
std::uniform_real_distribution<double> dist(-10, 10);
for (int n = 100; n--; std::this_thread::sleep_for(std::chrono::seconds(1))) {
std::vector<Eigen::Vector3d> points;
for (int i = 0; i < 3; ++i) {
auto x = dist(gen), y = dist(gen), z = dist(gen);
points.emplace_back(x, y, z);
std::cout << x << " " << y << " " << z << std::endl;
}
Grab(points);
}
}
文件
receiver.cpp
#include "datastruct.hpp"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <string>
namespace bip = boost::interprocess;
int main() {
std::cout << std::fixed << std::setprecision(3);
try {
bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
bip::message_queue::size_type recvd_size;
unsigned int priority;
while (true) {
dataStruct::FvpData me;
std::stringstream iss;
std::string serialized_string;
serialized_string.resize(MAX_SIZE);
mq.receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
iss << serialized_string;
boost::archive::binary_iarchive ia(iss);
ia >> me;
std::cout << me.frameNumber << std::endl;
for (auto const& p : me.UserPoints)
std::cout << p.PositionX << " " << p.PositionY << " " << p.PositionZ << "\n";
}
} catch (bip::interprocess_exception const& ex) {
std::cerr << ex.what() << std::endl;
}
// message_queue::remove("mq");
}
接下来,我确认可以正常运行。
您没有说如何停止程序。由于显示的代码中没有任何内容,我可能会假设您使用 Ctrl-C (或更糟):
如您所见,您遇到了锁定异常。你需要干净地退出。您可以实现自己的优雅关闭,或者使用信号来拦截 Ctrl-C/终止:
std::atomic_bool shutdown{false};
boost::asio::thread_pool io{1};
boost::asio::signal_set sigs(io, SIGINT, SIGTERM);
sigs.async_wait([&](boost::system::error_code const& ec, int num) {
std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
shutdown = true;
});
例如调整后的sender.cpp
#include "datastruct.hpp"
#include <Eigen/Core>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
namespace bip = boost::interprocess;
static int num = 1;
void Grab(std::vector<Eigen::Vector3d> points) {
dataStruct::FvpData data{{}, ++num};
for (auto const& p : points)
data.UserPoints.push_back(dataStruct::VUserPoint{
.PositionX = static_cast<float>(p.x()),
.PositionY = static_cast<float>(p.y()),
.PositionZ = static_cast<float>(p.z()),
});
try {
bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
std::stringstream oss;
{
boost::archive::binary_oarchive oa(oss);
oa << data;
} // completes the archive!
std::string serialized_string(oss.str());
mq.send(serialized_string.data(), serialized_string.size(), 1);
std::cout << data.frameNumber << std::endl;
} catch (bip::interprocess_exception const& ex) {
std::cerr << ex.what() << std::endl;
}
}
#include <random>
#include <thread>
int main() {
std::cout << std::fixed << std::setprecision(3);
std::mt19937 gen(std::random_device{}());
std::uniform_real_distribution<double> dist(-10, 10);
std::atomic_bool shutdown{false};
boost::asio::thread_pool io{1};
boost::asio::signal_set sigs(io, SIGINT, SIGTERM);
sigs.async_wait([&](boost::system::error_code const& ec, int num) {
std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
shutdown = true;
});
try {
for (; !shutdown; std::this_thread::sleep_for(std::chrono::seconds(1))) {
std::vector<Eigen::Vector3d> points;
for (int i = 0; i < 3; ++i) {
auto x = dist(gen), y = dist(gen), z = dist(gen);
points.emplace_back(x, y, z);
std::cout << x << " " << y << " " << z << std::endl;
}
Grab(points);
}
} catch (std::exception const& ex) {
std::cerr << ex.what() << std::endl;
}
io.join();
}
观看直播: