我正在尝试使用 zeromq 框架实现 pub sub 设计模式。 这个想法是先启动订阅者,然后再启动发布者。 订阅者将收听 100 条消息,发布者将发布 100 条消息。 到目前为止,一切都很好... 然而实际情况是,即使发布者启动时订阅者已经启动并运行,订阅者也不会收到所有消息(如果发布者发送至少 500 条消息,订阅者将接收 100 条消息)信息)。似乎发布者发送的第一条消息没有发送给订阅者。
有什么想法吗?
提前致谢, 欧麦
订阅者代码(在发布者之前启动)
int i=0;
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
for (int update_nbr = 0; update_nbr < 100; update_nbr++)
{
zmq::message_t update;
subscriber.recv(&update);
i++;
std::cout<<"receiving :"<<i<<std::endl;
}
发布者代码(在订阅者之后启动)
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
int i = 0;
for (int update_nbr = 0; update_nbr < 100; update_nbr++)
{
// Send message to all subscribers
zmq::message_t request (20);
time_t seconds;
seconds = time (NULL);
char update [20]="";
sprintf (update, "%ld", seconds);
memcpy ((void *) request.data (), update,strlen(update));
publisher.send(request);
i++;
std::cout << "sending :" << i << std::endl;
}
参见https://zguide.zeromq.org/docs/chapter2/#Missing-Message-Problem-Solver(图 25 中的流程图,以及下面的解释)
基本上,建立连接需要一点时间(几毫秒),并且在这段时间内可能会丢失很多消息。发布者在开始发布之前需要睡一会儿,或者(更好)它需要明确地与订阅者同步。
请看指南.
在0MQ中,send()成功并不意味着数据立即通过网络发送。 http://api.zeromq.org/2-1:zmq-send。您的消息非常小,AFAIR 0MQ 对小消息进行某种缓冲以更有效地使用网络。
如果我没记错的话,0MQ的
out_batch_size
中的config.hpp
控制了这种行为。
要看的一件事(除了之前的评论者所指出的)是您的关机程序。
代码片段可能只是不完整,但我看不出你是如何处理关机的。特别是你实际上可能会丢失发送的last消息。查看 zmq_close、zmq_term 和 ZMQ_LINGER 的文档。如果您 not 实际上调用这些函数,而只是终止应用程序,那么有可能使用 zmq_send() 发送但尚未传输到网络的消息在关闭时丢失.
要检查丢失了哪些消息,您可以尝试发送除时间戳之外的序列号。