程序的多个实例中的ZeroMQ IPC

问题描述 投票:2回答:1

我在程序的多个实例之间的ZMQ中的进程间通信遇到一些问题

  • 我正在使用Linux OS
  • 我正在使用zeromq / cppzmq,libzmq的仅标头C ++绑定

如果我运行此应用程序的两个实例(例如在终端上运行),我将为其中一个提供一个参数作为侦听器,然后为另一个提供一个参数作为发送者。侦听器永远不会收到消息。我尝试了TCP和IPC无济于事。

#include <zmq.hpp>
#include <string>
#include <iostream>

int ListenMessage();
int SendMessage(std::string str);

zmq::context_t global_zmq_context(1);

int main(int argc, char* argv[] ) {
    std::string str = "Hello World";
    if (atoi(argv[1]) == 0) ListenMessage();
    else SendMessage(str);

    zmq_ctx_destroy(& global_zmq_context);
    return 0;
}


int SendMessage(std::string str) {
    assert(global_zmq_context);
    std::cout << "Sending \n";
    zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
    assert(publisher);

    int linger = 0;
    int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: connect failed: %s\n", strerror (errno));
        return -1;
    }

    zmq::message_t message(static_cast<const void*> (str.data()), str.size());
    rc = publisher.send(message);
    if (rc == -1) {
        printf ("E: send failed: %s\n", strerror (errno));
        return -1;
    }
    return 0;
}

int ListenMessage() {
    assert(global_zmq_context);
    std::cout << "Listening \n";
    zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
    assert(subscriber);

    int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(rc==0);

    int linger = 0;
    rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: bind failed: %s\n", strerror (errno));
        return -1;
    }

    std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
    while (true) {
        zmq::message_t rx_msg;
        // when timeout (the third argument here) is -1,
        // then block until ready to receive
        std::cout << "Still Listening before poll \n";
        zmq::poll(p.data(), 1, -1);
        std::cout << "Found an item \n"; // not reaching
        if (p[0].revents & ZMQ_POLLIN) {
            // received something on the first (only) socket
            subscriber.recv(&rx_msg);
            std::string rx_str;
            rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
            std::cout << "Received: " << rx_str << std::endl;
        }
    }
    return 0;
}

如果我使用两个线程运行程序的一个实例,此代码将起作用

    std::thread t_sub(ListenMessage);
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
    std::thread t_pub(SendMessage str);
    t_pub.join();
    t_sub.join();

但是我想知道为什么在运行程序的两个实例时上面的代码不起作用?

感谢您的帮助!

c++ tcp ipc zeromq
1个回答
1
投票

如果从未使用过ZeroMQ,在这里您可以先看看"ZeroMQ Principles in less than Five Seconds",然后再深入研究更多细节]]

Q

想知道为什么在运行程序的两个实例时,上面的代码不起作用?

代码永远不会飞过-与基于thread的基于或基于process[CONCURENT]处理无关。这是由于

I

其他P流程C通讯的错误设计引起的。ZeroMQ可以为此提供一种支持的传输类:{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// },再加上一种更智能的用于进程内通信的类,一种inproc://传输类为线程间通信准备,其中无栈通信可能是有史以来最低的延迟,而这仅仅是内存映射策略。

可以选择用于[[I

nter-

P过程-C通信的基于L3 / L2的网络堆栈,但是这是最“昂贵”的选择。核心错误:


给出该选择,任何单个进程(不是说一对进程)

将发生冲突

试图将其[[

AccessPoint .bind()

放入very相同 TCP / IP-address:port#其他错误:即使为了启动独奏程序,两个生成的线程都尝试
.bind()其私有

AccessPoint

,但没有尝试.connect()匹配的“相反”

AccessPoint

至少有一个必须成功.bind(),并且至少必须成功完成.connect()才能获得“通道”,这里是

PUB/SUB原型。待办事项:


确定适当的,正确的

Transport-Class

(最好避免过大的操作本地主机/进程内IPC的L3 / L2堆栈的权限)
    重构
  • Address:port#
  • 管理(为了使2个以上的进程在.bind()-上不失败,恢复为相同的(硬连线的)address:port#总是检测并适当地处理API调用返回的{PASS|FAIL} -s
  • 始终将[LINGER
  • 设置为零(您永远不会知道)
© www.soinside.com 2019 - 2024. All rights reserved.