我一直在尝试使用 zmq eventloop 来设置服务器/客户端来进行
REQ / REP
消息传递。由于python 3不支持zmq提供的eventloop,所以我尝试使用tornado的eventloop来运行它。
我在使用 python 3 运行 zmqStream 和龙卷风事件循环时遇到问题。
我使用zmq的zmqStream和tornado的eventloop创建了服务器/客户端代码。客户端正在发送正确的消息,但服务器似乎没有响应消息请求。
服务器端代码:
from tornado import ioloop
import zmq
def echo(stream, msg):
stream.send_pyobj(msg)
ctx = zmq.Context()
socket = ctx.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:5678')
stream = ZMQStream(socket)
stream.on_recv(echo)
ioloop.IOLoop.current().start()
客户端代码:
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://127.0.0.1:5678")
for request in range (1,10):
print("Sending request ", request,"...")
socket.send_string("Hello")
# Get the reply.
message = socket.recv_pyobj()
print("Received reply ", request, "[", message, "]")
我期望服务器返回客户端发送的请求消息。但它只是不响应发送的请求。
服务器似乎没有响应
一个服务器端 SLOC,
stream = ZMQStream( socket )
调用一个函数,该函数没有 MCVE 文档记录,并且必须并且确实无法执行以产生任何结果:"ZMQStream" in dir()
通过 False
确认了这一点
补救措施: 修复 MCVE 并
print( zmq.zmq_version )
+ "ZMQStream" in dir()
确认
始终防止无限死锁,除非存在适当的理由不这样做,并在执行相应的
.bind()
或 .connect()
<aSocket>.setsockopt( zmq.LINGER, 0 )
之前进行设置。永远挂起的应用程序和未释放的(是的,你没看错,无限阻塞)资源在分布式计算系统中是不受欢迎的。
避免总是容易遇到的盲目分布式互锁。它会发生,只是人们永远不知道什么时候。您可以在 Stack Overflow 上阅读有关此内容的大量详细信息。
还有补救措施吗?可以(并且应该在可能的情况下)避免使用
REQ/REP
-s 的阻塞形式(公平的 .recv()
-s 在设计方面、资源方面都更聪明)可以在“抛出”之前使用额外的发送方信号任何一方进入无限阻塞
.poll()
-s (然而,网络传输失败或静默消息丢失的其他原因可能会导致软信令标记发送,这不会导致接收和相互死锁,其中硬连线行为将 REQ/REP 双方移动到等待另一方,以发送消息(对方永远不会发送该消息,因为它也在等待 .recv()
-从(仍在侦听)对面发送尚未收到的消息侧面))
最后但并非最不重要的一点:ZeroMQ Zen-of-Zero 还具有零保证 - 因为消息要么完全交付(无错误),要么根本不交付。如果一个人永远不会陷入相互僵局,那么
.recv()
REQ/REP
和
LINGER
)