服务器在对象列表中循环,这些对象上的数据实时更改。服务器每毫秒发布一次这些对象的所有新数据。即['Carrot', 'Banana', 'Mango', 'Eggplant']
客户端可以通过其名称订阅特定对象。self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'Carrot')
在线程上,客户端也会实时轮询这些数据
while True:
sockets = dict(self.poller.poll(poll_timeout))
if self.sub_socket in sockets and sockets[self.sub_socket] == zmq.POLLIN:
msg = self.sub_socket.recv_string(zmq.DONTWAIT)
// do something with the msg...
问题是当我订阅多个对象时,说Carrot, Eggplant & Banana
。我只是从Carrot
接收更改,有时从Banana
接收,因此在Eggplant
上很少见。我认为这是因为从服务器循环的顺序开始,例如当客户端轮询,接收Carrot,处理数据,然后再次轮询时,但是服务器已经完成了通过列表的发布,只是再次发布了Carrot然后客户端因此,民意调查仅接收胡萝卜。
所以我想为每个订阅创建单独的套接字?这是解决方案吗?我对ZMQ非常陌生,希望在这里提供一些帮助,谢谢!
Q:“这是解决方案吗?” ...为每个订阅创建单独的套接字?
[No。除非出于某些原因(我不知道,是出于其他原因)。
虽然ZeroMQ消息传递基础结构为每个消息传递提供了零保证,但这并不意味着消息在发送后消失或丢失。它只是说,期望对每一个交付的产品实行零保修,如果需要的话,可以增加这样想要的保修机制开销,如果其他人可以在没有时间的情况下工作,则他们无需支付费用。是否要以1.000.000的比例分配吗? 1合10.000.0000.000?这取决于许多因素,但是丢失消息不是ZeroMQ系统的常见或随机状态(并且有一些内部原因,其详细信息超出了本文的范围)。
仍然有疑问吗?
"ZeroMQ: Principles in less than Five Seconds"端发送均匀分布的琐碎消息
distributed-computing使用此测试,您将收到均匀分布的样本,其中每个订阅的PUB
数量相同(如果所有订阅的都相同)。
增长SAMPLEs = int( 1E6 ) aMsgSIZE = 2048 TOPICs = [ r'Carrot', r'Banana', r'Mango', r'Eggplant', r'' ] MASK = "{0:}" + aMsgSIZE * "_" for i in range( SAMPLEs ): PUB.send( MASK.format( TOPICs[np.random.randint( len( TOPICs ) - 1 ) ) ) time.sleep( 1E-3 )
可能会(在默认情况下TOPICs
-和aMsgSIZE
-实例下)会创建一些消息以告知您“丢失”,但同样,此消息应均匀分布。如果没有,将会有一些麻烦要深入。
Context()
数量统一未传递的消息数量将表明需要调整Socket()
和SAMPLEs
-instances参数的大小,以便提供足够的资源来安全地入队该数量的数据-流。但是,为单个订阅的
Topic
字符串设置更多Context()
-s并不能解决此资源管理瓶颈(如果存在)。如果主题分布均匀或不对称并且最后没有收到多少分数,请毫不犹豫地发布测试结果。一如既往地添加平台详细信息+ ZeroMQ版本,这很重要:o)