将majordomo代理与异步客户端一起使用

问题描述 投票:8回答:3

在阅读zeromq指南时,我遇到了客户端代码,它在循环中发送100k请求,然后在第二个循环中接收回复。

#include "../include/mdp.h"
#include <time.h>


int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdp_client_t *session = mdp_client_new ("tcp://localhost:5555", verbose);
    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        mdp_client_send (session, "echo", &request);
    }
    printf("sent all\n");

    for (count = 0; count < 100000; count++) {
        zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  Interrupted by Ctrl-C
        printf("reply received:%d\n", count);
    }
    printf ("%d replies received\n", count);
    mdp_client_destroy (&session);
    return 0;
}

我添加了一个计数器来计算worker(test_worker.c)发送给代理的回复数,以及mdp_broker.c中的另一个计数器来计算代理发送给客户端的回复数。这两个都高达100k,但客户端只收到大约37k回复。

如果客户端请求的数量设置为大约40k,那么它将收到所有回复。当客户端发送超过40k的异步请求时,有人可以告诉我为什么数据包会丢失吗?

我尝试将代理套接字的HWM设置为100k,但问题仍然存在:

static broker_t *
s_broker_new (int verbose)
{
    broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
    int64_t hwm = 100000;
    //  Initialize broker state
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    zmq_setsockopt(self->socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));

    zmq_setsockopt(self->socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
    self->verbose = verbose;
    self->services = zhash_new ();
    self->workers = zhash_new ();
    self->waiting = zlist_new ();
    self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    return self;
}
zeromq distributed-computing
3个回答
3
投票

在不设置HWM和使用默认TCP设置的情况下,仅使用50k消息就会丢失数据包。

以下内容有助于缓解代理处的数据包丢失:

  1. 为zeromq套接字设置HWM。
  2. 增加TCP发送/接收缓冲区大小。

这只能达到某一点。有两个客户端,每个客户端发送100k消息,经纪人能够正常管理。但当客户数量增加到三个时,他们就停止收到所有回复。

最后,帮助我确保没有数据包丢失的是以下列方式更改客户端代码的设计:

  1. 客户端可以一次发送最多N条消息。客户端的RCVHWM和代理的SNDHWM应足够高,以容纳总共N条消息。
  2. 之后,对于客户端收到的每个回复,它会发送两个请求。

1
投票

您发送100k消息,然后开始接收它们。因此,100k消息应存储在缓冲区中。当缓冲区耗尽且无法再存储消息时,您将达到ZeroMQ的高水位线。 ZeroMQ文档中指定了高水位线上的行为。

在上述代码的情况下,经纪人可能会丢弃一些消息,因为a majordomo broker使用ROUTER socket。其中一个解决方案是将发送/接收循环拆分为分离的线程


1
投票

为什么输了?

在ZeroMQ v2.1中,ZMQ_HWM的默认值是INF(无穷大),这有助于上述测试有些意义,但代价是内存溢出崩溃的风险很大,因为缓冲区分配策略没有受到约束/控制,因此至于达到一些物理极限。

从ZeroMQ v3.0 +开始,ZMQ_SNDHWM / ZMQ_RCVHWM默认为1000,之后可以设置。

您也可以阅读明确的警告

ØMQ不保证套接字将接受与ZMQ_SNDHWM消息一样多的数量,并且实际限制可能会低多达60-70%,具体取决于套接字上的消息流。

将发送/接收部分拆分为单独的线程有帮助吗?

没有。

快速解决?

是的,出于演示测试实验的目的,再次设置无限高水位标记,但要小心避免在任何生产级软件中进行此类练习。

为什么要以这种方式测试ZeroMQ性能?

如上所述,原始的演示测试似乎在其v2.1实现中具有一些含义。

从那时起,ZeroMQ已经发展了很多。一个非常好的阅读,以满足您对表现信封的特殊兴趣,这可能会让您更深入地了解这个领域,这在step by step guide with code examples on ZeroMQ protocol overheads/performance case-study on large file transfers

...我们已经遇到了一个问题:如果我们向ROUTER套接字发送太多数据,我们很容易将其溢出。简单但愚蠢的解决方案是在插座上放置一个无限的高水位标记。这是愚蠢的,因为我们现在无法防止耗尽服务器的内存。然而,如果没有无限的HWM,我们就有可能丢失大块文件。

试试这个:将HWM设置为1,000(在ZeroMQ v3.x中这是默认值),然后将块大小减小到100K,这样我们一次发送10K块。运行测试,你会发现它永远不会完成。正如zmq_socket()手册页以欢快的粗暴所说,对于ROUTER套接字:“ZMQ_HWM选项动作:丢弃”。

我们必须控制服务器预先发送的数据量。发送超出网络可以处理的数量是没有意义的。我们一次尝试发送一个块。在此版本的协议中,客户端将明确地说“给我块N”,服务器将从磁盘获取该特定块并发送它。

据我所知,最好的部分是在“模型3”流控制的结果表现中,人们可以从ZeroMQ指南中的精彩章节和现实生活中获得很多东西。

© www.soinside.com 2019 - 2024. All rights reserved.