这基本上是我们用 C 语言编写的使用 ZeroMQ 的模块:
#define RET_ERROR(x) syslog(LOG_ERR, x "%s", zmq_strerror(errno)); cleanupZMQ(); return false;
static zmq_ctx_t ctx = NULL;
static zmq_sock_t pub_sock = NULL;
static zmq_sock_t sub_sock = NULL;
static zmq_sock_t sub_mon_sock = NULL;
static zmq_sock_t pub_mon_sock = NULL;
static char* subscriptions[] = { REQ_TOPIC };
static size_t subscriptions_count = (sizeof(subscriptions) / sizeof(subscriptions[0]));
static inline void cleanupZMQ() {
if (pub_sock) {
zmq_close(pub_sock);
}
if (sub_sock) {
zmq_close(sub_sock);
}
if (sub_mon_sock) {
zmq_close(sub_mon_sock);
}
if (pub_mon_sock) {
zmq_close(pub_mon_sock);
}
if (ctx) {
zmq_ctx_destroy(ctx);
}
}
static bool waitForConnect(zmq_sock_t monitor) {
zmq_msg_t msg;
bool ret = true;
bool connected = false;
bool handshaked = false;
do {
zmq_msg_init(&msg);
int rc = zmq_msg_recv(&msg, monitor, 0);
if (rc < 0) {
RET_ERROR("Error! Can not receive first frame from monitor: ")
}
uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
uint16_t event = *((uint16_t*)data);
uint32_t value = *((uint32_t*)data+2);
zmq_msg_init(&msg);
rc = zmq_msg_recv(&msg, monitor, 0);
if (rc < 0) {
RET_ERROR("Error! Can not receive second frame from monitor: ")
}
char* addr = (char*)zmq_msg_data(&msg);
syslog(LOG_DEBUG, "Event: %u, Value: %u, Addr: %s", event, value, addr);
if (event == ZMQ_EVENT_CONNECTED) {
syslog(LOG_INFO, "Connected to '%s'.", addr);
connected = true;
} else if (event == ZMQ_EVENT_CONNECT_DELAYED) {
syslog(LOG_NOTICE, "Connecting delayed!");
} else if (event == ZMQ_EVENT_CONNECT_RETRIED) {
syslog(LOG_NOTICE, "Connecting retried!");
} else if ((event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL)
|| (event == ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL)
|| (event == ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)) {
syslog(LOG_ERR, "Error! Handshake with '%s' failed: %s", addr, zmq_strerror(value));
handshaked = true;
ret = false;
} else if (event == ZMQ_EVENT_HANDSHAKE_SUCCEEDED) {
syslog(LOG_INFO, "Handshake with '%s' succeded.", addr);
handshaked = true;
ret = true;
} else {
syslog(LOG_NOTICE, "Unexpected event: %u", event);
}
} while(!(connected && handshaked));
zmq_msg_close(&msg);
return ret;
}
bool startZMQ() {
if (ctx == NULL) {
ctx = zmq_ctx_new();
if (ctx == NULL) {
RET_ERROR("Error! Can not open ZMQ context: ");
}
} else {
syslog(LOG_INFO, "ZMQ is already started.");
}
if (sub_sock == NULL) {
sub_sock = zmq_socket(ctx, ZMQ_SUB);
if (sub_sock == NULL) {
RET_ERROR("Error! Can not open ZMQ sub socket: ");
}
if (zmq_socket_monitor(sub_sock, SUB_MON_ADDR, ZMQ_EVENT_ALL)) {
RET_ERROR("Error! Can not monitor ZMQ sub socket: ");
}
sub_mon_sock = zmq_socket(ctx, ZMQ_PAIR);
if (sub_mon_sock == NULL) {
RET_ERROR("Error! Can not open ZMQ sub-monitor socket: ");
}
if (zmq_connect(sub_mon_sock, SUB_MON_ADDR)) {
RET_ERROR("Error! Can not connect ZMQ sub-monitor socket: ");
}
for (size_t i=0; i<subscriptions_count; i++) {
if (zmq_setsockopt(sub_sock, ZMQ_SUBSCRIBE, subscriptions[i], strlen(subscriptions[i]))) {
syslog(LOG_ERR, "Error! Can not subscribe to topic '%s': %s", subscriptions[i], zmq_strerror(errno));
} else {
syslog(LOG_INFO, "Subscribed to '%s'.", subscriptions[i]);
}
}
if (zmq_connect(sub_sock, SUB_ADDR)) {
RET_ERROR("Error! Can not connect ZMQ sub socket: ");
}
waitForConnect(sub_mon_sock);
} else {
syslog(LOG_INFO, "Subscriber socket is already open.");
}
if (pub_sock == NULL) {
pub_sock = zmq_socket(ctx, ZMQ_PUB);
if (pub_sock == NULL) {
RET_ERROR("Error! Can not open ZMQ pub socket: ");
}
if (zmq_socket_monitor(pub_sock, PUB_MON_ADDR, ZMQ_EVENT_ALL)) {
RET_ERROR("Error! Can not monitor ZMQ pub socket: ");
}
pub_mon_sock = zmq_socket(ctx, ZMQ_PAIR);
if (pub_mon_sock == NULL) {
RET_ERROR("Error! Can not open ZMQ pub-monitor socket: ");
}
if (zmq_connect(pub_mon_sock, PUB_MON_ADDR)) {
RET_ERROR("Error! Can not connect ZMQ pub-monitor socket: ");
}
if (zmq_connect(pub_sock, PUB_ADDR)) {
RET_ERROR("Error! Can not connect ZMQ pub socket: ");
}
waitForConnect(pub_mon_sock);
} else {
syslog(LOG_INFO, "Publisher socket is already open.");
}
sleep(3);
return true;
}
size_t sendZMQMsg(const char* topic, size_t topic_len, const msg_buffer_t msg, size_t msg_len) {
size_t sended = 0;
int rc = zmq_send(pub_sock, topic, topic_len, ZMQ_SNDMORE);
if (rc < 0) {
syslog(LOG_ERR, "Error! Could not send ZMQ topic: %s", zmq_strerror(errno));
return 0;
}
sended += rc;
rc = zmq_send(pub_sock, msg, msg_len, 0);
if (rc < 0) {
syslog(LOG_ERR, "Error! Could not send ZMQ message: %s", zmq_strerror(errno));
return 0;
}
sended += rc;
return sended;
}
void endZMQ() {
cleanupZMQ();
}
如您所见,我们必须在
sleep(3)
函数的末尾添加一个 startZMQ()
。如果没有这个,前几条发送的消息将会丢失。
我们当然知道这种“慢加入综合症”。我们确保代理在连接任何内容之前已准备好,并且订阅者在发布者之前连接(也有三秒延迟)。但是,我们仍然必须等待这三秒钟,发布者才能使用他们的套接字。 由于中央代理,发布者和订阅者彼此不认识,我们不希望他们必须直接相互连接,因为我们有很多这两个部分,如果有人必须直接连接到其他人,系统基本上无法维护。
我们发现了这个问题和这个一个,当然我们阅读了指南,特别是这个部分,在指南本身中使用了
sleep(1)
,并且使用第二个套接字对进行轮询是受到推崇的。
除了轮询之外,这个库中真的没有其他方法来检查您的套接字是否已准备好吗?
如您所见,我们已经使用 zmq-monitor-sockets 在
waitForConnect
函数中捕获了 zmq 事件。这还不够吗?我们在这里遗漏了什么吗?
从根本上讲,您受到 tcp(支撑 ZMQ)缓冲这一事实的限制。 ZMQ 做了很多确保发送的消息被传递(心跳、整个消息传递等),只要连接处于活动状态,但这并不能解决所有问题。
tcp和ZMQ都实现了“Actor模型”;发送的数据由传输缓冲。在接收者调用recv()之前,发送者可以从send()返回。如果您想要绝对保证送达(或确认未送达),您需要将某种确认消息传回发件人。
您最终得到的是通信顺序流程。这是 Actor 模型,但发送者和接收者之间的连接不会缓冲正在传输的数据。在接收者的recv() 完成之前,send() 不会完成。这充当执行集合点(发送行为意味着您的代码知道其执行过程中的位置 - 即 recv() 已完成)。如果您有(准)实时要求,这尤其有用;您的发件人可以知道收件人是否跟不上!
CSP 在 Go 和 Rust 中实现 - 它有点重新流行起来。