ZeroMQ如何使用C 我是ZMQ的新手。我有多个发布商和一个客户。寻求建议以最佳方式实施它。当前,它利用单个客户端和服务器的回复-请求模式;必须将其扩展到多个发布者和一个订户。 此应用程序将在不支持C11的QNX系统上运行,因此zmq::mutlipart_t没有帮助。 void TransportLayer::Init() { socket.bind( "tcp://*:5555" ); } void TransportLayer::Receive() { while ( true ) { zmq::message_t request; string protoBuf; socket.recv( &request ); uint16_t id = *( (uint16_t*)request.data() ); protoBuf = std::string( static_cast<char*>( request.data() + sizeof( uint16_t ) ), request.size() - sizeof( uint16_t ) ); InterfaceLayer::getInstance()->ParseProtoBufTable( protoBuf ); } Send(); usleep( 1 ); } void TransportLayer::Send() { zmq::message_t reply( 1 ); memcpy( reply.data(), "#", 1 ); socket.send( reply ); } 这是我编写的代码,最初旨在仅侦听一个客户端,现在我必须扩展它以侦听多个客户端。 我尝试使用zmq::multipart_t,但这需要C11支持,但我们使用的QNX版本不支持C11。 我尝试实施建议的解决方案。我创建了2个发布商,它们连接到相同的静态位置。 观察: I]执行顺序:1.启动订户2.启动Publisher1(它仅发布一个数据值) 订户未收到此数据。 II]修改了Publisher1以在while循环中发送相同的数据执行顺序:1.启动订户2.启动Publisher1 3.启动Publsiher2。 现在我看到订户正在从两个发布者处接收数据。 这表明存在数据丢失的可能性。 我如何确保绝对不会丢失数据? 这是我的源代码: 发布者2: dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) { } void dummyFrontEnd::Init() { socket.connect("tcp://127.0.0.1:5555"); cout << "Connecting .... " << endl; } void dummyFrontEnd::SendData() { while ( std::getline(file, line_str) ) { std::stringstream ss(line_str); std::string direction; double tdiff; int i, _1939, pgn, priority, source, length, data[8]; char J, p, _0, dash, d; ss >> tdiff >> i >> J >> _1939 >> pgn >> p >> priority >> _0 >> source >> dash >> direction >> d >> length >> data[0] >> data[1] >> data[2] >> data[3] >> data[4] >> data[5] >> data[6] >> data[7]; timestamp += tdiff; while ( gcl_get_time_ms() - start_time < uint64_t(timestamp * 1000.0) - first_time ) { usleep(1); } if (arguments.verbose) { std::cout << timestamp << " " << i << " " << J << " " << _1939 << " " << pgn << " " << p << " " << priority << " " << _0 << " " << source << " " << dash << " " << direction << " " << d << " " << length << " " << data[0] << " " << data[1] << " " << data[2] << " " << data[3] << " " << data[4] << " " << data[5] << " " << data[6] << " " << data[7] << std::endl; } uint64_t timestamp_ms = (uint64_t)(timestamp * 1000.0); protoTable.add_columnvalues(uint64ToString(timestamp_ms)); /* timestamp */ protoTable.add_columnvalues(intToString(pgn)); /* PGN */ protoTable.add_columnvalues(intToString(priority)); /* Priority */ protoTable.add_columnvalues(intToString(source)); /* Source */ protoTable.add_columnvalues(direction); /* Direction */ protoTable.add_columnvalues(intToString(length)); /* Length */ protoTable.add_columnvalues(intToString(data[0])); /* data1 */ protoTable.add_columnvalues(intToString(data[1])); /* data2 */ protoTable.add_columnvalues(intToString(data[2])); /* data3 */ protoTable.add_columnvalues(intToString(data[3])); /* data4 */ protoTable.add_columnvalues(intToString(data[4])); /* data5 */ protoTable.add_columnvalues(intToString(data[5])); /* data6 */ protoTable.add_columnvalues(intToString(data[6])); /* data7 */ protoTable.add_columnvalues(intToString(data[7])); /* data8 */ zmq::message_t create_values(protoTable.ByteSizeLong()+sizeof(uint16_t)); *((uint16_t*)create_values.data()) = TABLEMSG_ID; // ID protoTable.SerializeToArray(create_values.data()+sizeof(uint16_t), protoTable.ByteSizeLong()); socket.send(create_values); protoTable.clear_columnvalues(); usleep(1); } } 发布者1: dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) { } void dummyFrontEnd::Init() { socket.connect("tcp://127.0.0.1:5555"); cout << "Connecting .... " << endl; } void dummyFrontEnd::SendData() { cout << "In SendData" << endl; while(1) { canlogreq canLogObj = canlogreq::default_instance(); canLogObj.set_fromhours(11); canLogObj.set_fromminutes(7); canLogObj.set_fromseconds(2); canLogObj.set_fromday(16); canLogObj.set_frommonth(5); canLogObj.set_fromyear(2020); canLogObj.set_tohours(12); canLogObj.set_tominutes(7); canLogObj.set_toseconds(4); canLogObj.set_today(17); canLogObj.set_tomonth(5); canLogObj.set_toyear(2020); zmq::message_t logsnippetmsg(canLogObj.ByteSizeLong() + sizeof(uint16_t)); *((uint16_t*)logsnippetmsg.data()) = 20; canLogObj.SerializeToArray(logsnippetmsg.data()+sizeof(uint16_t), canLogObj.ByteSizeLong()); socket.send(logsnippetmsg); usleep(1); canLogObj.clear_fromhours(); canLogObj.clear_fromminutes(); canLogObj.clear_fromseconds(); canLogObj.clear_fromday(); canLogObj.clear_frommonth(); canLogObj.clear_fromyear(); canLogObj.clear_tohours(); canLogObj.clear_tominutes(); canLogObj.clear_toseconds(); canLogObj.clear_today(); canLogObj.clear_tomonth(); canLogObj.clear_toyear(); } } 订户: TransportLayer::TransportLayer():context(1),socket(context,ZMQ_SUB){ } void TransportLayer::Init() { socket.bind("tcp://*:5555"); socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); } void TransportLayer::Receive() { cout << "TransportLayer::Receive " << " I am in server " << endl; static int count = 1; // Producer thread. while ( true ){ zmq::message_t request; string protoBuf; socket.recv(&request); uint16_t id = *((uint16_t*)request.data()); cout << "TransportLayer : " << "request.data: " << request.data() << endl; cout << "TransportLayer : count " << count << endl; count = count + 1; cout << "TransportLayer : request.data.size " << request.size() << endl; protoBuf = std::string(static_cast<char*>(request.data() + sizeof(uint16_t)), request.size() - sizeof(uint16_t)); cout << "ProtoBuf : " << protoBuf << endl; InterfaceLayer *interfaceLayObj = InterfaceLayer::getInstance(); switch(id) { case TABLEMSG_ID: cout << "Canlyser" << endl; interfaceLayObj->ParseProtoBufTable(protoBuf); break; case LOGSNIPPET_ID: cout << "LogSnip" << endl; interfaceLayObj->ParseProtoBufLogSnippet(protoBuf); interfaceLayObj->logsnippetSignal(); // publish the signal break; default: break; } usleep(1); } } 我是ZMQ的新手。我有多个发布商和一个客户。寻求建议以最佳方式实施它。当前,它利用单个客户端和服务器的回复-请求模式; ...

问题描述 投票:1回答:1

我是ZMQ的新手。我有多个发布商和一个客户。寻求建议以最佳方式实施它。当前,它利用单个客户端和服务器的回复-请求模式;必须将其扩展到多个发布者和一个订户。

此应用程序将在不支持C11的QNX系统上运行,因此zmq::mutlipart_t没有帮助。

void TransportLayer::Init()
{
    socket.bind( "tcp://*:5555" );
}

void TransportLayer::Receive()
{
    while ( true ) {
        zmq::message_t request;
        string protoBuf;
        socket.recv( &request );

        uint16_t id = *( (uint16_t*)request.data() );
        protoBuf = std::string( static_cast<char*>( request.data()
                                                  + sizeof( uint16_t )
                                                    ),
                                request.size() - sizeof( uint16_t )
                                );
        InterfaceLayer::getInstance()->ParseProtoBufTable( protoBuf );
    }
    Send();
    usleep( 1 );
}

void TransportLayer::Send()
{
    zmq::message_t reply( 1 );
    memcpy( reply.data(), "#", 1 );

    socket.send( reply );
}

这是我编写的代码,最初旨在仅侦听一个客户端,现在我必须扩展它以侦听多个客户端。

我尝试使用zmq::multipart_t,但这需要C11支持,但我们使用的QNX版本不支持C11。


我尝试实施建议的解决方案。我创建了2个发布商,它们连接到相同的静态位置。

观察:

I]执行顺序:1.启动订户2.启动Publisher1(它仅发布一个数据值)

订户未收到此数据。

II]修改了Publisher1以在while循环中发送相同的数据执行顺序:1.启动订户2.启动Publisher1 3.启动Publsiher2。

现在我看到订户正在从两个发布者处接收数据。

这表明存在数据丢失的可能性。

我如何确保绝对不会丢失数据?


这是我的源代码:

发布者2:

dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {

}
void dummyFrontEnd::Init()
{
    socket.connect("tcp://127.0.0.1:5555");
    cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData() {

while ( std::getline(file, line_str) ) {

        std::stringstream ss(line_str);
        std::string direction;

        double tdiff;
        int    i, _1939, pgn, priority, source, length, data[8];
        char   J, p, _0, dash, d;

        ss >> tdiff >> i >> J >> _1939 >> pgn >> p >> priority >> _0 >> source
           >> dash >> direction >> d >> length >> data[0] >> data[1] >> data[2]
           >> data[3] >> data[4] >> data[5] >> data[6] >> data[7];

        timestamp += tdiff;

        while (            gcl_get_time_ms() - start_time <
                uint64_t(timestamp * 1000.0) - first_time ) { usleep(1); }

        if (arguments.verbose) {
            std::cout << timestamp << " " << i << " " << J << " " << _1939 << " "
                << pgn << " " << p << " " << priority << " " << _0 << " " << source
                << " " << dash << " " << direction << " " << d << " " << length
                << " " << data[0] << " " << data[1] << " " << data[2] << " "
                << data[3] << " " << data[4] << " " << data[5] << " " << data[6]
                << " " << data[7] << std::endl;
        }

        uint64_t timestamp_ms = (uint64_t)(timestamp * 1000.0);

        protoTable.add_columnvalues(uint64ToString(timestamp_ms)); /* timestamp */
        protoTable.add_columnvalues(intToString(pgn));             /* PGN       */
        protoTable.add_columnvalues(intToString(priority));        /* Priority  */
        protoTable.add_columnvalues(intToString(source));          /* Source    */
        protoTable.add_columnvalues(direction);                    /* Direction */
        protoTable.add_columnvalues(intToString(length));          /* Length    */
        protoTable.add_columnvalues(intToString(data[0]));         /* data1     */
        protoTable.add_columnvalues(intToString(data[1]));         /* data2     */
        protoTable.add_columnvalues(intToString(data[2]));         /* data3     */
        protoTable.add_columnvalues(intToString(data[3]));         /* data4     */
        protoTable.add_columnvalues(intToString(data[4]));         /* data5     */
        protoTable.add_columnvalues(intToString(data[5]));         /* data6     */
        protoTable.add_columnvalues(intToString(data[6]));         /* data7     */
        protoTable.add_columnvalues(intToString(data[7]));         /* data8     */

    zmq::message_t create_values(protoTable.ByteSizeLong()+sizeof(uint16_t));
        *((uint16_t*)create_values.data()) = TABLEMSG_ID;  // ID
        protoTable.SerializeToArray(create_values.data()+sizeof(uint16_t), protoTable.ByteSizeLong());

        socket.send(create_values);

        protoTable.clear_columnvalues();
        usleep(1);
    }

}

发布者1:

dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {

}
void dummyFrontEnd::Init()
{
    socket.connect("tcp://127.0.0.1:5555");
    cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData()
{
   cout << "In SendData" << endl;

   while(1) { 
       canlogreq canLogObj = canlogreq::default_instance();
                 canLogObj.set_fromhours(11);
                 canLogObj.set_fromminutes(7);
                 canLogObj.set_fromseconds(2);
                 canLogObj.set_fromday(16);
                 canLogObj.set_frommonth(5);
                 canLogObj.set_fromyear(2020);
                 canLogObj.set_tohours(12);
                 canLogObj.set_tominutes(7);
                 canLogObj.set_toseconds(4);
                 canLogObj.set_today(17);
                 canLogObj.set_tomonth(5);
                 canLogObj.set_toyear(2020);

       zmq::message_t logsnippetmsg(canLogObj.ByteSizeLong() + sizeof(uint16_t));

       *((uint16_t*)logsnippetmsg.data()) = 20;

       canLogObj.SerializeToArray(logsnippetmsg.data()+sizeof(uint16_t), canLogObj.ByteSizeLong());

       socket.send(logsnippetmsg);

       usleep(1);

       canLogObj.clear_fromhours();
       canLogObj.clear_fromminutes();
       canLogObj.clear_fromseconds();
       canLogObj.clear_fromday();
       canLogObj.clear_frommonth();
       canLogObj.clear_fromyear();
       canLogObj.clear_tohours();
       canLogObj.clear_tominutes();
       canLogObj.clear_toseconds();
       canLogObj.clear_today();
       canLogObj.clear_tomonth();
       canLogObj.clear_toyear();
   }

}

订户:

TransportLayer::TransportLayer():context(1),socket(context,ZMQ_SUB){ }
void TransportLayer::Init()
{
    socket.bind("tcp://*:5555"); 
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
}
void TransportLayer::Receive()
{
    cout << "TransportLayer::Receive " << " I am in server " << endl;

    static int count = 1;
    // Producer thread.
    while ( true ){

        zmq::message_t request;
        string protoBuf;

        socket.recv(&request);

        uint16_t id = *((uint16_t*)request.data());

        cout << "TransportLayer : " << "request.data:  " << request.data() << endl;
        cout << "TransportLayer : count " << count << endl; count = count + 1;
        cout << "TransportLayer : request.data.size " << request.size() << endl;

        protoBuf = std::string(static_cast<char*>(request.data() + sizeof(uint16_t)), request.size() - sizeof(uint16_t));

        cout << "ProtoBuf : " << protoBuf << endl;

        InterfaceLayer *interfaceLayObj = InterfaceLayer::getInstance();

        switch(id) {
            case TABLEMSG_ID:   cout << "Canlyser" << endl;
                                interfaceLayObj->ParseProtoBufTable(protoBuf);
                                break; 
            case LOGSNIPPET_ID: cout << "LogSnip" << endl;
                                interfaceLayObj->ParseProtoBufLogSnippet(protoBuf);
                                interfaceLayObj->logsnippetSignal(); // publish the signal
                                break;
            default:            break;
        }

        usleep(1);

    }

}
c++ zeromq
1个回答
0
投票

Q“如何使用C 多个发布者和单个客户端,?”


如果从未使用过ZeroMQ,在这里可以先研究一下"ZeroMQ Principles in less than Five Seconds",然后再探讨更多细节


因此,没有明确说明QNX版本,因此让我们可以正常使用。

如上所述,在"ZeroMQ Principles in less than Five Seconds"中,单个客户端(具有SUB-Archetype)可能zmq_connect( ? ),但是以管理一些我不知道的方式来管理所有其他方式,当前方式以及将来的任何方式PUB-s传递给zmq_bind(),此后让SUB以某种方式让SUB学习到zmq_connect( ? )的位置,以便从新绑定的PUB -peer获得一些消息。 >

因此,使单个SUB代理执行zmq_bind(),并让当前或将来的任何PUB-随即针对单个对象执行zmq_connect(),这是一种更聪明的方法,静态的已知SUB的位置(这并不是说,他们不能使用任何可用的传输类-一个inproc://

,另一个tcp://,一些[C0 ],如果QNX允许并且系统架构要求这样做(并且显然ipc://代理已经公开了正确配置的AccessNode来接收此类连接)。

接下来,您的SUB

-客户端必须配置其订阅筛选主题列表:是“接收一切!”的命令。 :
SUB

鉴于此工作,您的下一个任务是使设置足够健壮(将... retCode = zmq_setsockopt( <aSubSocketINSTANCE>, ZMQ_SUBSCRIBE, "", 0 ); assert( retCode == 0 && "FAILED: at ZMQ_SUBSCRIBE order " ); ... 的显式设置为0,访问策略,安全性,可扩展的资源,L2 / L3网络保护措施,等等。]

并且您已经完成了充分利用适合您QNX系统设计需求的ZeroMQ。

使用这样的智能功能强大的信令/消息框架进行愉快的消息收发!

© www.soinside.com 2019 - 2024. All rights reserved.