使用CZMQ-4.1.0新的zsock API的异步Majordomo Pattern示例更新不起作用

问题描述 投票:1回答:1

在使用brew安装zmq和czmq之后,我尝试编译并播放Asynchronous-Majordomo-Pattern,但它不起作用,因为它需要czmq v3。据我所知,我尝试使用zactor将其更新为v4,因为

zthread被弃用以支持zactor http://czmq.zeromq.org/czmq3-0:zthread

所以现在以下代码看起来很好,因为更新了async-majordomo模式,但它没有按预期工作,当我通过终端运行时它不会创建任何线程。

//  Round-trip demonstrator
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. The client task signals to
//  main when it's ready.

#include "czmq.h"
#include <stdlib.h>

void dbg_write_in_file(char * txt, int nb_request) {
    FILE * pFile;
    pFile = fopen ("myfile.txt","a");

    if (pFile!=NULL)
    {
        fputs (txt, pFile);

        char str_nb_request[12];
        sprintf(str_nb_request, "%d", nb_request);
        fputs (str_nb_request, pFile);

        fputs ("\n", pFile);
        fclose (pFile);
    }
}

static void
client_task (zsock_t *pipe, void *args)
{
    zsock_t *client = zsock_new (ZMQ_DEALER);
    zsock_connect (client, "tcp://localhost:5555");
    printf ("Setting up test...\n");
    zclock_sleep (100);

    printf("child 1: parent: %i\n\n", getppid());
    printf("child 1: my pid: %i\n\n", getpid());

    int requests;
    int64_t start;

    printf ("Synchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");

        // stuck here /!\

        char *reply = zstr_recv (client);
        zstr_free (&reply);

        // check if it does something
        dbg_write_in_file("sync round-trip requests : ", requests);
        // end check
    }
    printf (" %d calls/second\n",
        (1000 * 10000) / (int) (zclock_time () - start));

    printf ("Asynchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++) {
        zstr_send (client, "hello");

        // check if it does something
        dbg_write_in_file("async round-trip send requests : ", requests);
        // end check
    }
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        zstr_free (&reply);

        // check if it does something
        dbg_write_in_file("async round-trip rec requests : ", requests);
        // end check
    }
    printf (" %d calls/second\n",
        (1000 * 100000) / (int) (zclock_time () - start));

    zstr_send (pipe, "done");
}

//  Here is the worker task. All it does is receive a message, and
//  bounce it back the way it came:

static void
worker_task (zsock_t *pipe, void *args)
{
    printf("child 2: parent: %i\n\n", getppid());
    printf("child 2: my pid: %i\n\n", getpid());

    zsock_t *worker = zsock_new (ZMQ_DEALER);
    zsock_connect (worker, "tcp://localhost:5556");

    while (true) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);    
    }
    zsock_destroy (&worker);
}

//  Here is the broker task. It uses the zmq_proxy function to switch
//  messages between frontend and backend:

static void
broker_task (zsock_t *pipe, void *args)
{
    printf("child 3: parent: %i\n\n", getppid());
    printf("child 3: my pid: %i\n\n", getpid());

    //  Prepare our sockets
    zsock_t *frontend = zsock_new (ZMQ_DEALER);
    zsock_bind (frontend, "tcp://localhost:5555");
    zsock_t *backend = zsock_new (ZMQ_DEALER);
    zsock_bind (backend, "tcp://localhost:5556");
    zmq_proxy (frontend, backend, NULL);

    zsock_destroy (&frontend);
    zsock_destroy (&backend);
}

//  Finally, here's the main task, which starts the client, worker, and
//  broker, and then runs until the client signals it to stop:

int main (void)
{
    //  Create threads
    zactor_t *client = zactor_new (client_task, NULL);
    assert (client);    
    zactor_t *worker = zactor_new (worker_task, NULL);
    assert (worker);
    zactor_t *broker = zactor_new (broker_task, NULL);
    assert (broker);

    //  Wait for signal on client pipe
    char *signal = zstr_recv (client);
    zstr_free (&signal);

    zactor_destroy (&client);
    zactor_destroy (&worker);
    zactor_destroy (&broker);
    return 0;
}

当我运行它时,看起来程序卡在评论中

//卡在这里/!\

然后,当我因为没有完成而杀死它,或者根本不打印任何内容时,我需要按下五次Ctrl + C(^ C)。只有这样,它在控制台上看起来更加冗长,就像它确实在运行一样。 =>请注意,我删除了所有printf()步骤的输出,因为它读起来非常麻烦。

当它运行时,它只会在发送五个Ctrl + C(^ C)后才会向dbg_write_in_file()函数调用的文件写入任何内容。

客户工作者和经纪人任务都返回相同的getppid号码(我的终端)和getpid作为程序本身。

我用gcc trippingv4.c -o trippingv4 -L/usr/local/lib -lzmq -lczmq编译。

当我试图杀死它时:

./trippingv4
Setting up test...
child 1: parent: 60967

child 1: my pid: 76853

Synchronous round-trip test...
^Cchild 2: parent: 60967

child 2: my pid: 76853

^Cchild 3: parent: 60967

child 3: my pid: 76853

^C^C^CE: 18-02-28 00:16:37 [76853]dangling 'PAIR' socket created at src/zsys.c:471
E: 18-02-28 00:16:37 [76853]dangling 'DEALER' socket created at trippingv4.c:29
E: 18-02-28 00:16:37 [76853]dangling 'PAIR' socket created at src/zsys.c:471
E: 18-02-28 00:16:37 [76853]dangling 'DEALER' socket created at trippingv4.c:89

更新

感谢您详细解答@ user3666197。在第一部分中,编译器不编译断言调用,因此我只是显示值而是在视觉上进行比较,它们是相同的。

int czmqMAJOR,
czmqMINOR,
czmqPATCH;

zsys_version ( &czmqMAJOR, &czmqMINOR, &czmqPATCH );
printf( "INF: detected CZMQ ( %d, %d, %d ) -version\n",
         czmqMAJOR,
         czmqMINOR,
         czmqPATCH
         );

printf( "INF: CZMQ_VERSION_MAJOR %d, CZMQ_VERSION_MINOR %d, CZMQ_VERSION_PATCH %d\n",
         CZMQ_VERSION_MAJOR,
         CZMQ_VERSION_MINOR,
         CZMQ_VERSION_PATCH
         );

输出:

INF: detected CZMQ ( 4, 1, 0 ) -version
INF: CZMQ_VERSION_MAJOR 4, CZMQ_VERSION_MINOR 1, CZMQ_VERSION_PATCH 0

zsys_info调用确实编译但在终端上没有显示任何内容,即使使用fflush(stdout)以防万一,所以我只使用printf

INF: This system's Context() limit is 65535 ZeroMQ socketsINF: current state of the global Context()-instance has:
     ( 1 )-IO-threads ready
     ( 1 )-ZMQ_BLOCKY state

然后我用zsys_set_io_threads(2)和/或zmq_ctx_set (aGlobalCONTEXT, ZMQ_BLOCKY, false);更改了全局上下文线程值,仍然被阻止。看起来zactor不适用于系统线程,因为zthread是...或者没有给出类似的行为。鉴于我在zeromq(也是零)的经验,我可能会尝试一些无法实现的东西。

更新已解决但无法解决

我的主要错误是没有正确启动zactor实例

一个actor函数必须在初始化时调用zsock_signal(pipe)并且必须监听管道并退出$ TERM命令。

并且在调用zactor_destroy (&proxy);之前没有阻止zactor的代理执行

我让下面的最终代码,但你仍然需要在Ctrl + C结束时退出,因为我没有弄清楚如何正确管理$TERM信号。此外,zactor似乎仍然没有使用系统theads。它可能是这样的设计,但我不知道它是如何在木材背后起作用的。

//  Round-trip demonstrator
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. The client task signals to
//  main when it's ready.

#include <czmq.h>

static void
client_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Client"));
    zsock_signal (pipe, 0);

    zsock_t *client = zsock_new (ZMQ_DEALER);
    zsock_connect (client, "tcp://127.0.0.1:5555");

    printf ("Setting up test...\n");
    zclock_sleep (100);

    int requests;
    int64_t start;

    printf ("Synchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");

        zmsg_t *msgh = zmsg_recv (client);
        zmsg_destroy (&msgh);

    }
    printf (" %d calls/second\n",
        (1000 * 10000) / (int) (zclock_time () - start));

    printf ("Asynchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++) {
        zstr_send (client, "hello");
    }
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        zstr_free (&reply);
    }
    printf (" %d calls/second\n",
        (1000 * 100000) / (int) (zclock_time () - start));

    zstr_send (pipe, "done");
    printf("send 'done' to pipe\n");
}

//  Here is the worker task. All it does is receive a message, and
//  bounce it back the way it came:

static void
worker_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Worker"));
    zsock_signal (pipe, 0);

    zsock_t *worker = zsock_new (ZMQ_DEALER);
    zsock_connect (worker, "tcp://127.0.0.1:5556");

    bool terminated = false;
    while (!terminated) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);
        // zstr_send (worker, "hello back"); // Give better perf I don't know why

    }
    zsock_destroy (&worker);
}

//  Here is the broker task. It uses the zmq_proxy function to switch
//  messages between frontend and backend:

static void
broker_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Task"));
    zsock_signal (pipe, 0);

    //  Prepare our proxy and its sockets
    zactor_t *proxy = zactor_new (zproxy, NULL);
    zstr_sendx (proxy, "FRONTEND", "DEALER", "tcp://127.0.0.1:5555", NULL);
    zsock_wait (proxy);
    zstr_sendx (proxy, "BACKEND", "DEALER", "tcp://127.0.0.1:5556", NULL);
    zsock_wait (proxy);

    bool terminated = false;
    while (!terminated) {
        zmsg_t *msg = zmsg_recv (pipe);
        if (!msg)
            break;              //  Interrupted
        char *command = zmsg_popstr (msg);

        if (streq (command, "$TERM")) {
            terminated = true;
            printf("broker received $TERM\n");
        }

        freen (command);
        zmsg_destroy (&msg);
    }

    zactor_destroy (&proxy);
}

//  Finally, here's the main task, which starts the client, worker, and
//  broker, and then runs until the client signals it to stop:

int main (void)
{

    //  Create threads
    zactor_t *client = zactor_new (client_task, "Hello, Client");
    assert (client);
    zactor_t *worker = zactor_new (worker_task, "Hello, Worker");
    assert (worker);
    zactor_t *broker = zactor_new (broker_task, "Hello, Task");
    assert (broker);

    char *signal = zstr_recv (client);
    printf("signal %s\n", signal);
    zstr_free (&signal);

    zactor_destroy (&client);
    printf("client done\n");
    zactor_destroy (&worker);
    printf("worker done\n");
    zactor_destroy (&broker);
    printf("broker done\n");

    return 0;
}
c multithreading zeromq
1个回答
1
投票

让我们一步一步地诊断原状态:

int czmqMAJOR,
    czmqMINOR,
    czmqPATCH;

zsys_version ( &czmqMAJOR, &czmqMINOR, &czmqPATCH );
printf( "INF: detected CZMQ( %d, %d, %d )-version",
         czmqMAJOR,
         czmqMINOR,
         czmqPATCH
         );
assert ( czmqMAJOR == CZMQ_VERSION_MAJOR & "Major: does not match\n" );
assert ( czmqMINOR == CZMQ_VERSION_MINOR & "Minor: does not match\n" );
assert ( czmqPATCH == CZMQ_VERSION_PATCH & "Patch: does not match\n" );

如果这符合您的期望,您可能希望DLL版本匹配并在适当的位置找到。


下一个:

可能会测试整个马戏团以非阻塞模式运行,以证明,没有其他阻止程序,但作为简要检查,我没有在CZMQ-API中发现这样的选项,本机API允许标记NOBLOCK选项{ _send() | _recv() }-operations,它可以防止它们被阻塞(在DEALER-s的情况下可能是_send()套接字实例的情况,当时还没有任何具有POSACK-ed .bind()/.connect()状态的交易对手)。

在这里,我没有找到一些工具来像在本机API中那样快速地执行此操作。也许你会有更多的运气来完成这个。


如果已准备就绪,请测试全局Context()实例的存在:

在第一个套接字实例化之前添加,以确保我们在任何和所有套接字生成之前以及它们各自的_bind()/_connect()操作下面的自我报告行,使用:

 zsys_info ( "INF: This system's Context() limit is %zu ZeroMQ sockets",
              zsys_socket_limit ()
              );

也可以手动强制执行Context()实例化:

为了确保全局Context()实例已启动并运行,在任何更高抽象的实例之前询问是否实现其他内部性(套接字,计数器,处理程序,端口管理等)

//  Initialize CZMQ zsys layer; this happens automatically when you create
//  a socket or an actor; however this call lets you force initialization
//  earlier, so e.g. logging is properly set-up before you start working.
//  Not threadsafe, so call only from main thread. Safe to call multiple
//  times. Returns global CZMQ context.
CZMQ_EXPORT void *
    zsys_init (void);

//  Optionally shut down the CZMQ zsys layer; this normally happens automatically
//  when the process exits; however this call lets you force a shutdown
//  earlier, avoiding any potential problems with atexit() ordering, especially
//  with Windows dlls.
CZMQ_EXPORT void
    zsys_shutdown (void);

并且可能更好地调整IO性能,在初始化状态下使用此权限:

//  Configure the number of I/O threads that ZeroMQ will use. A good
//  rule of thumb is one thread per gigabit of traffic in or out. The
//  default is 1, sufficient for most applications. If the environment
//  variable ZSYS_IO_THREADS is defined, that provides the default.
//  Note that this method is valid only before any socket is created.
CZMQ_EXPORT void
    zsys_set_io_threads (size_t io_threads);

这个手动实例化给了一个额外的好处,从具有实例句柄void指针,以便可以通过zmq_ctx_get()工具检查它的当前状态和形状:

void *aGlobalCONTEXT = zsys_init();

printf( "INF: current state of the global Context()-instance has:\n" );
printf( "     ( %d )-IO-threads ready\n", zmq_ctx_get( aGlobalCONTEXT,
                                                       ZMQ_IO_THREADS
                                                       )
        );
printf( "     ( %d )-ZMQ_BLOCKY state\n", zmq_ctx_get( aGlobalCONTEXT,
                                                       ZMQ_BLOCKY
                                                       )
        ); // may generate -1 in case DLL is << 4.2+
...


如果对信号处理不满意,可以设计并使用另一个:

//  Set interrupt handler; this saves the default handlers so that a
//  zsys_handler_reset () can restore them. If you call this multiple times
//  then the last handler will take affect. If handler_fn is NULL, disables
//  default SIGINT/SIGTERM handling in CZMQ.
CZMQ_EXPORT void
    zsys_handler_set (zsys_handler_fn *handler_fn);

哪里

//  Callback for interrupt signal handler
typedef void (zsys_handler_fn) (int signal_value);
© www.soinside.com 2019 - 2024. All rights reserved.