我正在使用简单的一对一推拉工作者服务器python代码来发送和接收消息。工作者使用推套接字将消息发送到拉服务器。服务器处理单元不如工作单元强大,因此,在发送太多消息时,服务器的内存开始增大,直到系统杀死所有内容为止。我尝试如下设置接收器的高水位线:
worker_sock_in = ZMQ_CTXT.socket(zmq.PULL)
worker_sock_in.setsockopt(zmq.LINGER, 1000))
worker_sock_in.setsockopt(zmq.RCVTIMEO, 1000)) # detects if the link is broken
worker_sock_in.setsockopt(zmq.RCVHWM, 1000)
worker_sock_in_port = worker_sock_in.bind_to_random_port(listen_addr, port_start, port_end)
用于工作人员创建和发送消息的代码:
sock_dest = ZMQ_CTXT.socket(zmq.PUSH)
sock_dest.setsockopt(zmq.LINGER, 1000))
sock_dest.setsockopt(zmq.SNDTIMEO, 1000)) # detects if the link is broken
sock_dest.setsockopt(zmq.SNDHWM, 0) # never block on sending msg
sock_dest.connect(sock_dest_address)
# sends a msg
self.sock_dest.send(msg, zmq.NOBLOCK)
而且似乎可以解决问题,但是我猜是服务器只是丢弃了溢出消息,在我的情况下这是不可接受的。我已经基于此thread开发了我的开发软件,但是我不确定要了解答案的Additional Info
部分。
是否有一种推挽基础结构可以保证所有已发送的消息将被推套接字接收,而不会占用其内存或阻塞发件人?
我建议您在中间(发送方和接收方之间)添加一个代理,它将在给定的时间内保存已发送的消息。您必须制定代码逻辑来保存消息,并在服务器未收到特定消息时得到通知。 0mq无法提供保存或恢复丢失的消息的方法。