我在Python和C++中的许多配置中使用ZeroMQ,我想知道哪种是从另一个线程中止
recv()
或 poll()
的最优雅的方式(例如,在受控程序终止的情况下,但如果你想停止监听而不需要杀死套接字)。
与this问题相反,我不仅仅是想避免无限等待,而是想从recv()
或
poll()
返回立即。
我知道我可以只提供
timeout
并中止 recv()
,如下所示:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while _running:
if poller.poll(timeout=100) == []:
# maybe handle unwanted timout here..
continue
handle_message(socket.recv())
这将无休止地轮询套接字,直到
_running
从另一个线程设置为 False
- 最多 100 毫秒后我就完成了。
但这不太好 - 我有一个繁忙的循环,很难用这种方式来处理 real 超时,这可能是由于不需要的行为造成的。我也必须 等待超时,这在大多数情况下并不重要,但是..你知道我的意思。
当然我可以轮询额外的套接字以进行中止:
abort_socket = context.socket(zmq.SUB)
abort_socket.setsockopt(zmq.SUBSCRIBE, b"")
abort_socket.connect(<abort-publisher-endpoint>)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(abort_socket, zmq.POLLIN)
while _running:
poll_result = poller.poll(timeout=1000)
if socket in poll_result:
handle_message(socket.recv())
elif abort_socket in poll_result:
break
else:
# handle real timeout here
pass
但是这种方法也有缺点:
abort_socket
只能从一个线程使用,所以我必须确保这一点所以我的问题是:这是如何以好的方式完成的?
我可以以某种方式使用Python的
threading.Event
或其他东西吗?在其他语言中类似,而不是可以像这样传递给轮询器的中止套接字?:
def listener_thread_fn(event)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(event, zmq.POLLIN)
while _running:
poll_result = poller.poll(timeout=1000)
if socket in poll_result:
handle_message(socket.recv())
elif event in poll_result:
break
else:
# handle real timeout here
pass
因此,您只需首先创建一个
theading.Event()
,将其传递给 listener_thread_fn
并从任何线程调用 event.set()
来中止。
使用 Python 和
pyzmq
,在 recv()
或 poll()
中断时会引发错误;所以你可以在异常发生时简单地捕获它。带有 recv()
的示例:
while True:
try:
request = server.recv()
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
print('Clean exit now!')
break
raise
您可以轻松修改该代码以使用
poll()
代替(其过程相同)。另请注意,您需要:
import errno
这个问题很旧,但仍然相关,据我所知,没有真正的答案。
我的解决方案(虽然更多的是一种解决方法)确实只是从线程外部关闭套接字,以立即中止
polller.poll()
、socket.recv()
等。ZMQError("not a socket", errno=128)
,我决定捕获并让它通过。这与 ZMQ 只使用单线程套接字的警告直接冲突,但我根本找不到另一种方法来立即取消查询。
代码如下:
class ZMQClient:
# ...
def thread_loop(self):
# Called in a dedicated thread somewhere
try:
updated_sockets = dict(self._poller.poll(timeout=100))
except ZMQError as err:
if str(err) != "not a socket":
raise err # Don't show exception if context was killed
else:
if self.socket in updated_sockets:
msg = self.socket.recv()
# ...
# ...
def stop(self):
self.socket.close()
我将其与 PySide6 QT 应用程序结合起来,在其中我从
closeEvent()
: 停止循环
class MyMainWindow(QMainWindow):
def __init__():
# Start ZMQ thread
# self.client = ...
# ...
# ...
def closeEvent(self, event):
self.client.stop()