从另一个线程中止 Zeromq recv() 或 poll() - 立即且无需等待超时

问题描述 投票:0回答:2

我在Python和C++中的许多配置中使用ZeroMQ,我想知道哪种是从另一个线程中止

recv()
poll()
的最优雅的方式(例如,在受控程序终止的情况下,但如果你想停止监听而不需要杀死套接字)。

this问题相反,我不仅仅是想避免无限等待,而是想从recv()

poll()
返回
立即

我知道我可以只提供

timeout
并中止
recv()
,如下所示:

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while _running:
    if poller.poll(timeout=100) == []:
        # maybe handle unwanted timout here..
        continue

    handle_message(socket.recv())

这将无休止地轮询套接字,直到

_running
从另一个线程设置为
False
- 最多 100 毫秒后我就完成了。

但这不太好 - 我有一个繁忙的循环,很难用这种方式来处理 real 超时,这可能是由于不需要的行为造成的。我也必须 等待超时,这在大多数情况下并不重要,但是..你知道我的意思。

当然我可以轮询额外的套接字以进行中止:

abort_socket = context.socket(zmq.SUB)
abort_socket.setsockopt(zmq.SUBSCRIBE, b"")
abort_socket.connect(<abort-publisher-endpoint>)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(abort_socket, zmq.POLLIN)

while _running:
    poll_result = poller.poll(timeout=1000)
    if socket in poll_result:
        handle_message(socket.recv())
    elif abort_socket in poll_result:
        break
    else:
        # handle real timeout here
        pass

但是这种方法也有缺点:

  • 有点冗长 - 在我触发中止的地方,我必须创建一个发布者并使用它来中止接收者
  • abort_socket
    只能从一个线程使用,所以我必须确保这一点

所以我的问题是:这是如何以好的方式完成的?

我可以以某种方式使用Python的

threading.Event
或其他东西吗?在其他语言中类似,而不是可以像这样传递给轮询器的中止套接字?:

def listener_thread_fn(event)

    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)
    poller.register(event, zmq.POLLIN)

    while _running:
        poll_result = poller.poll(timeout=1000)
        if socket in poll_result:
            handle_message(socket.recv())
        elif event in poll_result:
            break
        else:
            # handle real timeout here
            pass

因此,您只需首先创建一个

theading.Event()
,将其传递给
listener_thread_fn
并从任何线程调用
event.set()
来中止。

python c++ zeromq
2个回答
1
投票

使用 Python 和

pyzmq
,在
recv()
poll()
中断时会引发错误;所以你可以在异常发生时简单地捕获它。带有
recv()
的示例:

while True:
    try:
        request = server.recv()
    except zmq.ZMQError as e:
        if e.errno == errno.EINTR:
            print('Clean exit now!')
            break
        raise

您可以轻松修改该代码以使用

poll()
代替(其过程相同)。另请注意,您需要:

import errno

0
投票

这个问题很旧,但仍然相关,据我所知,没有真正的答案。

我的解决方案(虽然更多的是一种解决方法)确实只是从线程外部关闭套接字,以立即中止

polller.poll()
socket.recv()
等。
然而,这将在 ZMQ 线程中引发异常:
ZMQError("not a socket", errno=128)
,我决定捕获并让它通过。这与 ZMQ 只使用单线程套接字的警告直接冲突,但我根本找不到另一种方法来立即取消查询。

代码如下:


class ZMQClient:

    # ...

    def thread_loop(self):
        # Called in a dedicated thread somewhere
        try:
            updated_sockets = dict(self._poller.poll(timeout=100))
        except ZMQError as err:
            if str(err) != "not a socket":
                raise err  # Don't show exception if context was killed
        else:
            if self.socket in updated_sockets:
                msg = self.socket.recv()
                # ...

    # ...

    def stop(self):
        self.socket.close()

我将其与 PySide6 QT 应用程序结合起来,在其中我从

closeEvent()
:

停止循环
class MyMainWindow(QMainWindow):

    def __init__():
        # Start ZMQ thread
        # self.client = ...
        # ...

    # ...

    def closeEvent(self, event):
        self.client.stop()
© www.soinside.com 2019 - 2024. All rights reserved.