我目前正在使用一个使用 ZeroMQ 的现有程序,其结构如下:
我需要在程序中向 Workers 发送请求并接收回复,但我遇到了困难,因为 Router 绑定到了 inproc 地址。
假设我无法修改现有程序,有没有办法实现这个功能?
目前的结构可以用下图来说明:
当前代码(C++)结构如下:
#include <chrono>
#include <functional>
#include <thread>
#include <vector>
#include <spdlog/spdlog.h>
#include <zmq.hpp>
void Proxy(zmq::context_t& context)
{
zmq::socket_t clients { context, zmq::socket_type::router };
zmq::socket_t workers { context, zmq::socket_type::dealer };
clients.bind("inproc://router");
workers.bind("tcp://0.0.0.0:5555");
zmq::proxy(clients, workers);
}
void Client(zmq::context_t& context)
{
using namespace std::chrono_literals;
zmq::socket_t socket { context, zmq::socket_type::req };
socket.connect("inproc://router");
for (;;)
{
socket.send(zmq::str_buffer("some request"));
zmq::message_t reply;
socket.recv(reply);
spdlog::trace("reply: {}", reply.to_string());
std::this_thread::sleep_for(1s);
}
}
void Worker(zmq::context_t& context) // actually located remotely
{
zmq::socket_t socket { context, zmq::socket_type::rep };
socket.connect("tcp://localhost:5555");
for (;;)
{
zmq::message_t request;
socket.recv(request);
spdlog::trace("request: {}", request.to_string());
socket.send(zmq::str_buffer("some reply"));
}
}
int main()
{
zmq::context_t context { 1 };
std::jthread proxy { Proxy, std::ref(context) };
std::vector<std::jthread> workers;
for (int i = 0; i < 4; ++i)
{
workers.emplace_back(Worker, std::ref(context));
}
std::vector<std::jthread> clients;
for (int i = 0; i < 4; ++i)
{
clients.emplace_back(Client, std::ref(context));
}
return 0;
}
我尝试了各种方法,但似乎都不起作用。
例如,我在程序中创建了一个 Router - Proxy - Dealer 结构,将 Dealer 连接到现有 Dealer 地址(0.0.0.0:5555),并尝试向 Router 发送请求。
虽然请求已发送,但工人似乎没有收到该请求。 (使用 Wireshark,我确认请求已发送给 Dealer,但 Dealer 并未将请求转发给 Workers。)
有没有什么方法可以实现这一点,而无需从头开始实现套接字通信?
感谢您的协助!
当你说“我无法修改现有程序”时,如果这意味着“零源代码更改”,那么你将无法利用现有路由器;它只能从进程内部访问。
但是,如果可以的话,您可以简单地添加一行来绑定代理的
clients
套接字两次,一次绑定到inproc
,第二次绑定到其他东西,无论是ipc
还是tcp
(ipc
将是更好,但不适用于 Windows)。这样,现有进程内的客户端可以通过 inproc
传输进行连接,同一台计算机上另一个进程中的客户端可以使用 ipc
连接到同一套接字。比如:
zmq::socket_t clients { context, zmq::socket_type::router };
zmq::socket_t workers { context, zmq::socket_type::dealer };
clients.bind("inproc://router");
clients.bind("ipc:///path/to/mynamedpipe");
workers.bind("tcp://0.0.0.0:5555");
新流程的客户会:
socket.connect("ipc:///path/to/mynamedpipe");
这不会以任何行为方式更改代理,并按原样保留应用程序的其余部分,这可能是您希望实现的目标。