对于ZMQ来说还很陌生,并试图构建非常基本的消息传递系统代码很大程度上基于here中的示例,但有些曲折
出于某种原因,在最后一条消息到达前端套接字(hbm
,tx
)代码之后,由于不确定我的来源而引发错误,所以>
对不起,我掉了很长的代码/输出,很重要的是要强调问题所在并帮助调试
在我的代码+输出下面:
s_frontend= "ipc://frontend.ipc" s_backend = "ipc://backend.ipc" import time def hbm(): """Worker task, using a REQ socket to do load-balancing.""" socket = zmq.Context().socket(zmq.REQ) socket.identity = u"hbm".encode("ascii") socket.connect(s_frontend) # Tell broker we're ready for work socket.send(b"READY") while True: msgs = socket.recv_multipart() print("hbm got something", msgs) if "BE READY" in msgs: print("hbm:BE is ready for some work") time.sleep(3) def tx(): """Worker task, using a REQ socket to do load-balancing.""" socket = zmq.Context().socket(zmq.REQ) socket.identity = u"txtx".encode("ascii") socket.connect(s_frontend) # Tell broker we're ready for work socket.send(b"READY") while True: msgs = socket.recv_multipart() print("tx got something ", msgs) if "BE READY" in msgs: print("tx:BE is ready for some work") time.sleep(3) def dev(): """Worker task, using a REQ socket to do load-balancing.""" socket = zmq.Context().socket(zmq.REQ) socket.identity = u"dev".encode("ascii") socket.connect(s_backend) # Tell broker we're ready for work socket.send(b"READY") def main(): # Prepare context and sockets context = zmq.Context.instance() frontend = context.socket(zmq.ROUTER) frontend.bind("ipc://frontend.ipc") backend = context.socket(zmq.ROUTER) backend.bind("ipc://backend.ipc") def start(task, *args): process = multiprocessing.Process(target=task, args=args) process.daemon = True process.start() start(hbm) start(tx) time.sleep(1) start(dev) clients = [] poller = zmq.Poller() poller.register(backend, zmq.POLLIN) #poller.register(frontend, zmq.POLLIN) all_is_ready = False while True: sockets = dict(poller.poll(timeout=1)) #print(sockets) soc = None if backend in sockets: print("got something from backend") msg = backend.recv_multipart() print(msg) print("adding frontend to poller") poller.register(frontend, zmq.POLLIN) print("backend is ready, notify frontend") elif frontend in sockets: print("got something from frontend") msg = frontend.recv_multipart() print(msg) clients.append(bytes(msg[0])) elif len(clients) == 2 and all_is_ready is False: all_is_ready = True for c in clients: print("sending response to", c) time.sleep(0.1) # just to prevent print overlap frontend.send_multipart([c, b"", b"BE READY"]) else: print("so much work, no rest, sleeping for 3") time.sleep(3) # Clean up backend.close() frontend.close() context.term() if __name__ == "__main__": main()
运行此代码将导致输出失败:
so much work, no rest, sleeping for 3 got something from backend ['dev', '', 'READY'] adding frontend to poller backend is ready, notify frontend got something from frontend ['hbm', '', 'READY'] got something from frontend ['txtx', '', 'READY'] sending response to hbm sending response to txtx hbm got something ['BE READY'] hbm:BE is ready for some work tx got something ['BE READY'] tx:BE is ready for some work so much work, no rest, sleeping for 3 Process Process-1: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 104, in hbm msgs = socket.recv_multipart() File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 475, in recv_multipart parts = [self.recv(flags, copy=copy, track=track)] File "zmq/backend/cython/socket.pyx", line 791, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 827, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 191, in zmq.backend.cython.socket._recv_copy File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc raise ZMQError(errno) ZMQError: Operation cannot be accomplished in current state so much work, no rest, sleeping for 3 Process Process-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 130, in tx msgs = socket.recv_multipart() File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 475, in recv_multipart parts = [self.recv(flags, copy=copy, track=track)] File "zmq/backend/cython/socket.pyx", line 791, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 827, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 191, in zmq.backend.cython.socket._recv_copy File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc raise ZMQError(errno) ZMQError: Operation cannot be accomplished in current state so much work, no rest, sleeping for 3 Traceback (most recent call last): File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 244, in <module> main() File "/home/vm/repo/pyzmq_examples/hal_example/example1.py", line 233, in main time.sleep(3) KeyboardInterrupt
以退出代码1完成的过程
对于ZMQ而言,我还很陌生,并且试图构建非常基本的消息传递系统代码完全是基于此处的示例,出于某种原因,在最后一条消息到达前端之后,由于某种原因而有些曲折...
Q