您如何使用ZeroMQ开发简单的DEALER / ROUTER消息流?

问题描述 投票:1回答:1

我对TCP消息传递(以及一般而言的编程)相当陌生,我正在尝试使用ZeroMQ开发一个简单的ROUTER / DEALER消息对,但正努力使路由器接收来自经销商的消息并发回。

我可以做一个简单的REQ/REP模式,没有问题,我可以在其中将一条消息从计算机发送到我的VM。

但是,当尝试开发ROUTER/DEALER对时,我似乎无法获得ROUTER实例来接收消息(VM上为ROUTER,主机上为DEALER)。我取得了一些成功,可以在while(){...}循环中向50条消息发送垃圾邮件,但无法发送一条消息,而让[[ROUTER发回一封邮件。

因此,根据我的阅读,ROUTER/DEALER对中的TCP消息在开始时以定界符0发送,并且必须先将此0发送给ROUTER以注册传入消息。

所以我只想将消息

“ ROUTER_TEST”发送到我的服务器,并让我的服务器以“ RECEIVED”进行响应。

经销商

#include <cstdlib> #include <iostream> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <assert.h> #include <stdio.h> #include "zmq.h" const char connection[] = "tcp://10.0.10.76:5555"; int main(void) { int major, minor, patch; zmq_version(&major, &minor, &patch); printf("\nInstalled ZeroMQ version: %d.%d.%d\n", major, minor, patch); printf("Connecting to: %s\n", connection); void *context = zmq_ctx_new(); void *requester = zmq_socket(context, ZMQ_DEALER); int zc = zmq_connect(requester, connection); std::cout << "zmq_connect = " << zc << std::endl; int sm = zmq_socket_monitor(requester, connection, ZMQ_EVENT_ALL); std::cout << "zmq_socket_monitor = " << sm << std::endl; char messageSend[] = "ROUTER_TEST"; int request_nbr; int n = zmq_send(requester, NULL, 0, ZMQ_DONTWAIT|ZMQ_SNDMORE ); int ii = 0; if(n==0) { std::cout << "n = " << n << std::endl; while (ii < 50) { n = zmq_send(requester, messageSend, 31, ZMQ_DONTWAIT); ii++; } } return 0; }

ROUTER

// SERVER #include <cstdlib> #include <iostream> #include <string.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include "zmq.h" int main(void) { void *context = zmq_ctx_new(); void *responder = zmq_socket(context, ZMQ_ROUTER); printf("THIS IS WORKING - ROUTER\n"); int rc = zmq_bind(responder, "tcp://*:5555"); assert(rc == 0); zmq_pollitem_t pollItems[] = { {responder, 0, ZMQ_POLLIN, -1}}; int sm = zmq_socket_monitor(responder, "tcp://*:5555", ZMQ_EVENT_LISTENING); std::cout << "zmq_socket_monitor = " << sm << std::endl; uint8_t buffer[15]; while (1) { int rc = zmq_recv(responder, buffer, 5, ZMQ_DONTWAIT); if (rc == 0) { std::cout << "zmq_recv = " << rc << std::endl; zmq_send(responder, "RECIEVED", 9,0); } zmq_poll(pollItems, sizeof(pollItems), -1); } return 0; }
tcp zeromq
1个回答
0
投票

如果从未使用过ZeroMQ,可以在深入了解更多细节之前先看看"ZeroMQ Principles in less than Five Seconds"

您的代码调用,在[[DEALER

端,一系列:void *requester = zmq_socket( context, ZMQ_DEALER // <-- .STO <ZMQ_DEALER>, *requester ); ... int n = zmq_send( requester, // <~~ <~~ <~~ <~~ <~~ <~~ .STO 0, n NULL, // NULL,sizeof(NULL)== 0 0, // explicitly declared 0 ZMQ_DONTWAIT // _DONTWAIT flag | ZMQ_SNDMORE //---- 1x ZMQ_SNDMORE flag== ); // 1.Frame in 1st MSG int ii = 0; // MSG-under-CONSTRUCTION if ( n == 0 ) // ...until complete, not yet sent { std::cout << "PREVIOUS[" << ii << ".] CALL of zmq_send() has returned n = " << n << std::endl; while ( ii < 50 ) { ii++; n = zmq_send( requester, //---------//---- 1x ZMQ_SNDMORE following messageSend, // // 2.Frame in 1st MSG 31, // // MSG-under-CONSTRUCTION, but ZMQ_DONTWAIT // // NOW complete & will get sent ); //---------//----49x monoFrame MSGs follow } } ...

在另一侧,

ROUTER
侧代码会发生什么?


... while (1) { int rc = zmq_recv( responder, //----------------- 1st .recv() buffer, 5, ZMQ_DONTWAIT ); if ( rc == 0 ) { std::cout << "zmq_recv = " << rc << std::endl; zmq_send( responder, // _____________________ void *socket "RECEIVED", // _____________________ void *buffer 9, // _____________________________ size_t len 0 // _____________________________ int flags ); } zmq_poll( pollItems, sizeof( pollItems ), -1 // __________________________________ block ( forever ) );// till ( if ever ) ...? }

这里,很可能是rc == 0,但是一次,如果没有被错过,但再也没有]]
请注意,如果.recv()呼叫也被

ZMQ_RECVMORE]标记,则您的代码不会以任何方式检测到-表示有必要先还.recv()-其余所有在能够.send() -any-answer ...

之前,第一个消息的多帧部分

处理多部分消息的应用程序

必须使用

在调用ZMQ_RCVMORE确定是否还有其他部分要接收之后,

zmq_getsockopt(3) zmq_recv()选项。

接下来,buffermessageSend消息-“有效载荷”是一种脆弱的实体,应重新组合(有关详细信息,最好再次阅读有关如何仔细初始化的所有详细信息,与任何

zmq_msg_t对象一起使用并安全地进行触摸,因为成功.send()/.recv()之后,低级API(自2.11.x +开始)认为它们已被废弃,不可重复使用。还请注意,messageSendnot(必须在代码中输入),长度为31- char[],是吗?是否有这样做的特殊意图?

zmq_send()函数如果成功,将返回消息中的字节数。否则,它将返回-1并将errno设置为以下定义的值之一。
{ EAGAIN, ENOTSUP, EINVAL, EFSM, ETERM, ENOTSOCK, EINTR, EHOSTUNREACH }

未测试错误状态意味着对EFSMREQ/REP的实际状态(请参阅DEALER/ROUTER和其他潜在的故障解释器一无所知)(扩展).send()/.recv()/.send()/.recv()/...这些步骤的强制性dFSA顺序


“因此,根据我的阅读,ROUTER/DEALER对中的TCP消息在开始时以定界符0发送,并且必须先将此0发送给ROUTER以注册传入的消息。 “

这似乎是一个错误的地方。应用程序端可以自由编写任意数量的单帧或多帧消息,但是ROUTER前置身份帧的“技巧”是在没有用户帮助的情况下执行的(消息标记在任何(现在,原则上所有)多帧(d)消息都传递到应用程序端(使用接收方的.recv()方法)。上面已指出了对多帧消息的应有处理。
© www.soinside.com 2019 - 2024. All rights reserved.