我正在做一个项目,我必须使用zmq_poll。但是我并没有完全理解它的作用。
所以我也尝试着去实现它。
zmq_pollitem_t timer_open(void){
zmq_pollitem_t items[1];
if( items[0].socket == nullptr ){
printf("error socket %s: %s\n", zmq_strerror(zmq_errno()));
return;
}
else{
items[0].socket = gsock;
}
items[0].fd = -1;
items[0].events = ZMQ_POLLIN;
// get a timer
items[0].fd = timerfd_create( CLOCK_REALTIME, 0 );
if( items[0].fd == -1 )
{
printf("timerfd_create() failed: errno=%d\n", errno);
items[0].socket = nullptr;
return;
}
int rc = zmq_poll(items,1,-1);
if(rc == -1){
printf("error poll %s: %s\n", zmq_strerror(zmq_errno()));
return;
}
else
return items[0];
}
我对这个话题很陌生, 我必须修改一个现有的项目, 用zmq的函数来替换. 在其他网站上我看到了一些例子,他们在一个无限循环中使用两个项目和zmq_poll函数。我看了文档,但还是不能正确理解这是如何工作的。而这是我实现的另外两个函数。我不知道这样的实现方式是否正确。
void timer_set(zmq_pollitem_t items[] , long msec, ipc_timer_mode_t mode ) {
struct itimerspec t;
...
timerfd_settime( items[0].fd , 0, &t, NULL );
}
void timer_close(zmq_pollitem_t items[]){
if( items[0].fd != -1 )
close(items[0].fd);
items[0].socket = nullptr;
}
我不确定是否需要zmq_poll函数 因为我使用的是定时器
EDIT:
void some_function_timer_example() {
// We want to wait on two timers
zmq_pollitem_t items[2] ;
// Setup first timer
ipc_timer_open_(&items[0]);
ipc_timer_set_(&items[0], 1000, IPC_TIMER_ONE_SHOT);
// Setup second timer
ipc_timer_open_(&items[1]);
ipc_timer_set_(&items[1], 1000, IPC_TIMER_ONE_SHOT);
// Now wait for the timers in a loop
while (1) {
//ipc_timer_set_(&items[0], 1000, IPC_TIMER_REPEAT);
//ipc_timer_set_(&items[1], 5000, IPC_TIMER_REPEAT);
int rc = zmq_poll (items, 2, -1);
assert (rc >= 0); /* Returned events will be stored in items[].revents */
if (items [0].revents & ZMQ_POLLIN) {
// Process task
std::cout << "revents: 1" << std::endl;
}
if (items [1].revents & ZMQ_POLLIN) {
// Process weather update
std::cout << "revents: 2" << std::endl;
}
}
}
现在它仍然非常快,而且没有等待。只有在开始的时候,它还在等待。而当timer_set在循环里面的时候,它就会正常等待,只有当等待时间相同的时候才会等待。ipc_timer_set(&items[1], 1000,...) and ipctimer_set(&items[0], 1000,...)
那我该如何改变这种情况?或者这就是正确的行为?
zmq_poll
工作原理和select一样,但是它允许一些额外的东西。例如,你可以选择普通的同步文件描述符,也可以选择特殊的异步套接字。
在你的例子中,你可以使用你已经尝试过的定时器fd,但是你需要做一些小的改变。
首先你必须考虑如何调用这些定时器。我想使用的情况是,如果你想创建多个定时器并等待它们。这将是典型的函数在yuor当前的代码,可能是使用循环的定时器(无论是使用select()或任何其他他们可能做).它将是这样的东西。
void some_function() {
// We want to wait on two timers
zmq_pollitem items[2];
// Setup first timer
ipc_timer_open(&item[0]);
ipc_timer_set(&item[0], 1000, IPC_TIMER_ONE_REPEAT);
// Setup second timer
ipc_timer_open(&item[1]);
ipc_timer_set(&item[1], 5000, IPC_TIMER_ONE_SHOT);
// Now wait for the timers in a loop
while (1) {
int rc = zmq_poll (items, 2, -1);
assert (rc >= 0); /* Returned events will be stored in items[].revents */
}
}
现在, 你需要修正ipc_timer_open. 这将是非常简单的 - 只需创建定时器fd。
// Takes a pointer to pre-allocated zmq_pollitem_t and returns 0 for success, -1 for error
int ipc_timer_open(zmq_pollitem_t *items){
items[0].socket = NULL;
items[0].events = ZMQ_POLLIN;
// get a timer
items[0].fd = timerfd_create( CLOCK_REALTIME, 0 );
if( items[0].fd == -1 )
{
printf("timerfd_create() failed: errno=%d\n", errno);
return -1; // error
}
return 0;
}
编辑: 添加作为评论的回复,因为这很长:来自文档。If both socket and fd are set in a single zmq_pollitem_t, the ØMQ socket referenced by socket shall take precedence and the value of fd shall be ignored.
所以如果你要传递fd, 你必须把socket设置为NULL. 我甚至不清楚在哪里 gsock
是来自。这是在文档中吗?我找不到。
还有什么时候能脱离while(1)循环?
这是应用逻辑,你要按照你的要求来编码。zmq_poll只是每当一个定时器打到的时候,就会一直返回。在这个例子中,每隔一秒,zmq_poll都会返回,因为第一个定时器(这是一个重复)一直在触发。但是在5秒的时候,它也会因为第二个定时器(这是一个单发)而返回。它由你决定何时退出循环。你想让这个循环无限地进行下去吗?你需要检查不同的条件来退出循环吗?你想这样做100次,然后返回吗?你可以在这段代码上面写任何你想要的逻辑。
那么返回的是什么样的事件呢?
ZMQ_POLLIN,因为定时器fs表现得像可读文件描述符。